reverting earlier changes to AqlTranslator, as there are more changes required and rightly need to be handled through a separate branch and merged back upon code review

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@900 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 7a87acb..be6305b 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -241,6 +241,7 @@
 
                     case QUERY: {
                         executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc));
+                        metadataProvider.setWriteTransaction(false);
                         break;
                     }
 
@@ -632,7 +633,6 @@
             jobs.add(new Job(jobSpec));
         }
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        // Loading from a file does not happen under a transaction!
         for (Job j : jobs) {
             runJob(hcc, j.getJobSpec());
         }
@@ -647,7 +647,6 @@
                 .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
 
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         runJob(hcc, compiled.first);
     }
 
@@ -660,7 +659,6 @@
         CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
                 .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         runJob(hcc, compiled.first);
     }
 
@@ -674,40 +672,39 @@
                 stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
                 stmtDelete.getVarCounter(), metadataProvider);
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         runJob(hcc, compiled.first);
     }
 
     private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
             ACIDException {
-
-        // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig, out, pdf);
-
-        // Query Compilation (happens under a new transaction, which is committed/aborted by installing a JobEventListener)
-        MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
-        AqlMetadataProvider metadataProviderQueryCompilation = new AqlMetadataProvider(mdTxnCtxQuery,
-                activeDefaultDataverse);
-        metadataProviderQueryCompilation.setWriterFactory(metadataProvider.getWriterFactory());
-        metadataProviderQueryCompilation.setOutputFile(metadataProvider.getOutputFile());
-        metadataProviderQueryCompilation.setConfig(metadataProvider.getConfig());
-        metadataProviderQueryCompilation.setWriteTransaction(metadataProvider.isWriteTransaction());
-
-        sessionConfig.setGenerateJobSpec(true);
-        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProviderQueryCompilation, query,
-                reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
-        sessionConfig.setGenerateJobSpec(false);
-
-        Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
-                metadataProviderQueryCompilation.getOutputFile());
+        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+        Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
+                metadataProvider, reWrittenQuery.second, stmt);
         return compiled;
 
     }
 
+    private Pair<JobSpecification, FileSplit> compileQuery(SessionConfig sessionConfig, Query query,
+            AqlMetadataProvider metadataProvider, int varCounter, ICompiledDmlStatement statement)
+            throws RemoteException, AsterixException, AlgebricksException, JSONException, ACIDException {
+        sessionConfig.setGenerateJobSpec(true);
+        MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
+        AqlMetadataProvider metadataProviderInsert = new AqlMetadataProvider(mdTxnCtxQuery, activeDefaultDataverse);
+        metadataProviderInsert.setWriterFactory(metadataProvider.getWriterFactory());
+        metadataProviderInsert.setOutputFile(metadataProvider.getOutputFile());
+        metadataProviderInsert.setConfig(metadataProvider.getConfig());
+        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query, varCounter,
+                statement == null ? null : statement.getDatasetName(), sessionConfig, out, pdf, statement);
+        sessionConfig.setGenerateJobSpec(false);
+        return new Pair<JobSpecification, FileSplit>(spec, metadataProvider.getOutputFile());
+    }
+
     private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         BeginFeedStatement bfs = (BeginFeedStatement) stmt;
         String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                 : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
@@ -716,17 +713,20 @@
                 bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
 
         Dataset dataset;
-        dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-                .getDatasetName().getValue());
+        dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, bfs.getDatasetName().getValue());
         IDatasetDetails datasetDetails = dataset.getDatasetDetails();
         if (datasetDetails.getDatasetType() != DatasetType.FEED) {
             throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
         }
-        bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+        bfs.initialize(mdTxnCtx, dataset);
         cbfs.setQuery(bfs.getQuery());
 
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+        Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider,
+                bfs.getQuery(), sessionConfig, out, pdf);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
+                metadataProvider, reWrittenQuery.second, cbfs);
         runJob(hcc, compiled.first);
     }
 
@@ -746,7 +746,6 @@
     private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc)
             throws Exception {
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         runJob(hcc, compiled.first);
         GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
         return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());