Merged resolution for issue 220:Incorrect use of transaction id in AqlMetadataProvider
into asterix_stabilization (from asterix_stabilization_txn_fix)


git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@906 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
index 5961c32..035f1df 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
@@ -263,7 +263,7 @@
         // The translator will compile metadata internally. Run this compilation
         // under the same transaction id as the "outer" compilation.
         AqlPlusExpressionToPlanTranslator translator = new AqlPlusExpressionToPlanTranslator(
-                metadataProvider.getTxnId(), metadataProvider, counter, null, null);
+                metadataProvider.getJobTxnId(), metadataProvider, counter, null, null);
 
         LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 5032a48..95ec76c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -261,6 +261,7 @@
 
         OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
         builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+        
         ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
         if (pc.isOptimize()) {
             compiler.optimize();
@@ -297,13 +298,6 @@
             }
         }
 
-        if (!pc.isGenerateJobSpec()) {
-            // Job spec not requested. Consider transaction against metadata
-            // committed.
-            MetadataManager.INSTANCE.commitTransaction(queryMetadataProvider.getMetadataTxnContext());
-            return null;
-        }
-
         AlgebricksPartitionConstraint clusterLocs = queryMetadataProvider.getClusterLocations();
         builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
         builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
@@ -320,7 +314,7 @@
 
         JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE);
         // set the job event listener
-        spec.setJobletEventListenerFactory(new JobEventListenerFactory(queryMetadataProvider.getTxnId(),
+        spec.setJobletEventListenerFactory(new JobEventListenerFactory(queryMetadataProvider.getJobTxnId(),
                 isWriteTransaction));
         if (pc.isPrintJob()) {
             switch (pdf) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index be6305b..08e1c5f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.asterix.api.common.APIFramework;
 import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
 import edu.uci.ics.asterix.api.common.Job;
-import edu.uci.ics.asterix.api.common.Job.SubmissionMode;
 import edu.uci.ics.asterix.api.common.SessionConfig;
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
@@ -83,6 +82,7 @@
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.types.TypeSignature;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionIDFactory;
 import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledBeginFeedStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
@@ -134,12 +134,12 @@
         return functionDecls;
     }
 
-    public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc) throws AlgebricksException,
-            RemoteException, ACIDException, AsterixException {
+    public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc) throws Exception {
         List<QueryResult> executionResult = new ArrayList<QueryResult>();
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
         Map<String, String> config = new HashMap<String, String>();
+        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
 
         for (Statement stmt : aqlStatements) {
             validateOperation(activeDefaultDataverse, stmt);
@@ -148,143 +148,159 @@
             metadataProvider.setWriterFactory(writerFactory);
             metadataProvider.setOutputFile(outputFile);
             metadataProvider.setConfig(config);
+            jobsToExecute.clear();
             try {
                 switch (stmt.getKind()) {
                     case SET: {
-                        SetStatement ss = (SetStatement) stmt;
-                        String pname = ss.getPropName();
-                        String pvalue = ss.getPropValue();
-                        config.put(pname, pvalue);
+                        handleSetStatement(metadataProvider, stmt, config, jobsToExecute);
                         break;
                     }
                     case DATAVERSE_DECL: {
-                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
+                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
                     case CREATE_DATAVERSE: {
-                        handleCreateDataverseStatement(metadataProvider, stmt);
+                        handleCreateDataverseStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
                     case DATASET_DECL: {
-                        handleCreateDatasetStatement(metadataProvider, stmt, hcc);
+                        handleCreateDatasetStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case CREATE_INDEX: {
-                        handleCreateIndexStatement(metadataProvider, stmt, hcc);
+                        handleCreateIndexStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case TYPE_DECL: {
-                        handleCreateTypeStatement(metadataProvider, stmt);
+                        handleCreateTypeStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
                     case NODEGROUP_DECL: {
-                        handleCreateNodeGroupStatement(metadataProvider, stmt);
+                        handleCreateNodeGroupStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
                     case DATAVERSE_DROP: {
-                        handleDataverseDropStatement(metadataProvider, stmt, hcc);
+                        handleDataverseDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case DATASET_DROP: {
-                        handleDatasetDropStatement(metadataProvider, stmt, hcc);
+                        handleDatasetDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case INDEX_DROP: {
-                        handleIndexDropStatement(metadataProvider, stmt, hcc);
+                        handleIndexDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case TYPE_DROP: {
-                        handleTypeDropStatement(metadataProvider, stmt);
+                        handleTypeDropStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
                     case NODEGROUP_DROP: {
-                        handleNodegroupDropStatement(metadataProvider, stmt);
+                        handleNodegroupDropStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
 
                     case CREATE_FUNCTION: {
-                        handleCreateFunctionStatement(metadataProvider, stmt);
+                        handleCreateFunctionStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
 
                     case FUNCTION_DROP: {
-                        handleFunctionDropStatement(metadataProvider, stmt);
+                        handleFunctionDropStatement(metadataProvider, stmt, jobsToExecute);
                         break;
                     }
 
                     case LOAD_FROM_FILE: {
-                        handleLoadFromFileStatement(metadataProvider, stmt, hcc);
+                        handleLoadFromFileStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case WRITE_FROM_QUERY_RESULT: {
-                        handleWriteFromQueryResultStatement(metadataProvider, stmt, hcc);
+                        handleWriteFromQueryResultStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case INSERT: {
-                        handleInsertStatement(metadataProvider, stmt, hcc);
+                        handleInsertStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
                     case DELETE: {
-                        handleDeleteStatement(metadataProvider, stmt, hcc);
+                        handleDeleteStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
 
                     case BEGIN_FEED: {
-                        handleBeginFeedStatement(metadataProvider, stmt, hcc);
+                        handleBeginFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
 
                     case CONTROL_FEED: {
-                        handleControlFeedStatement(metadataProvider, stmt, hcc);
+                        handleControlFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
                         break;
                     }
 
                     case QUERY: {
-                        executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc));
-                        metadataProvider.setWriteTransaction(false);
+                        executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc, jobsToExecute));
                         break;
                     }
 
                     case WRITE: {
-                        WriteStatement ws = (WriteStatement) stmt;
-                        File f = new File(ws.getFileName());
-                        outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
-                        if (ws.getWriterClassName() != null) {
-                            try {
-                                writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
-                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                            } catch (Exception e) {
-                                throw new AsterixException(e);
-                            }
+                        Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt,
+                                jobsToExecute);
+                        if (result.first != null) {
+                            writerFactory = result.first;
                         }
+                        outputFile = result.second;
                         break;
                     }
 
                 }
-
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             } catch (Exception e) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
                 throw new AlgebricksException(e);
             }
+            // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
+            for (JobSpecification jobspec : jobsToExecute) {
+                runJob(hcc, jobspec);
+            }
         }
         return executionResult;
     }
 
-    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, RemoteException, ACIDException {
+    private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config,
+            List<JobSpecification> jobsToExecute) throws RemoteException, ACIDException {
+        SetStatement ss = (SetStatement) stmt;
+        String pname = ss.getPropName();
+        String pvalue = ss.getPropValue();
+        config.put(pname, pvalue);
+    }
+
+    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws InstantiationException, IllegalAccessException,
+            ClassNotFoundException {
+        WriteStatement ws = (WriteStatement) stmt;
+        File f = new File(ws.getFileName());
+        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
+        IAWriterFactory writerFactory = null;
+        if (ws.getWriterClassName() != null) {
+            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
+        }
+        return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
+    }
+
+    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
         DataverseDecl dvd = (DataverseDecl) stmt;
         String dvName = dvd.getDataverseName().getValue();
         Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
         if (dv == null) {
-            throw new MetadataException(" Unknown dataverse " + dvName);
+            throw new MetadataException("Unknown dataverse " + dvName);
         }
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         return dv;
-
     }
 
-    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+            ACIDException {
         CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
         String dvName = stmtCreateDataverse.getDataverseName().getValue();
         Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
@@ -293,11 +309,10 @@
         }
         MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
                 stmtCreateDataverse.getFormat()));
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
     }
 
     private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws AsterixException, Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
         DatasetDecl dd = (DatasetDecl) stmt;
         String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
                 : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
@@ -363,14 +378,12 @@
         if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
             Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
                     dataverseName);
-            runCreateDatasetJob(hcc, dataverse, datasetName, metadataProvider);
+            runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
         }
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
-
     }
 
     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
                 : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
@@ -405,11 +418,11 @@
                     .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
             runJob(hcc, loadIndexJobSpec);
         }
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
     }
 
-    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, RemoteException, ACIDException, MetadataException {
+    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
+            MetadataException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         TypeDecl stmtCreateType = (TypeDecl) stmt;
         String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -437,11 +450,10 @@
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
             }
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
     private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
         String dvName = stmtDelete.getDataverseName().getValue();
@@ -473,11 +485,10 @@
                 activeDefaultDataverse = null;
             }
         }
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
     }
 
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         DropStatement stmtDelete = (DropStatement) stmt;
         String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -503,11 +514,10 @@
             }
             compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
     private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
         String datasetName = stmtIndexDrop.getDatasetName().getValue();
@@ -532,11 +542,11 @@
             throw new AlgebricksException(datasetName
                     + " is an external dataset. Indexes are not maintained for external datasets.");
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
-    private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+    private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+            ACIDException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
         String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
@@ -552,12 +562,11 @@
         } else {
             MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
     }
 
-    private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+            ACIDException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
@@ -568,11 +577,11 @@
         } else {
             MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
-    private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+    private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+            ACIDException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
@@ -588,11 +597,11 @@
                 .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
                 Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
         MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
-    private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, RemoteException, ACIDException, AlgebricksException {
+    private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
+            AlgebricksException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
@@ -603,12 +612,10 @@
         } else {
             MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 
     private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        List<Job> jobs = new ArrayList<Job>();
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
         String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -618,7 +625,7 @@
 
         IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
         Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-        jobs.add(job);
+        jobsToExecute.add(job.getJobSpec());
         // Also load the dataset's secondary indexes.
         List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
                 .getDatasetName().getValue());
@@ -629,17 +636,13 @@
             // Create CompiledCreateIndexStatement from metadata entity 'index'.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-            JobSpecification jobSpec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-            jobs.add(new Job(jobSpec));
-        }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        for (Job j : jobs) {
-            runJob(hcc, j.getJobSpec());
+            jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
         }
     }
 
     private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+        metadataProvider.setWriteTransaction(true);
         WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
         String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
                 : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
@@ -647,11 +650,11 @@
                 .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
 
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        runJob(hcc, compiled.first);
+        jobsToExecute.add(compiled.first);
     }
 
     private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         metadataProvider.setWriteTransaction(true);
         InsertStatement stmtInsert = (InsertStatement) stmt;
         String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -659,11 +662,11 @@
         CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
                 .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        runJob(hcc, compiled.first);
+        jobsToExecute.add(compiled.first);
     }
 
     private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         metadataProvider.setWriteTransaction(true);
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
         String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -672,39 +675,35 @@
                 stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
                 stmtDelete.getVarCounter(), metadataProvider);
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        runJob(hcc, compiled.first);
+        jobsToExecute.add(compiled.first);
     }
 
     private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
             ACIDException {
+
+        // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig, out, pdf);
-        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
-        Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
-                metadataProvider, reWrittenQuery.second, stmt);
+
+        // Query Compilation (happens under the same ongoing metadata
+        // transaction)
+        sessionConfig.setGenerateJobSpec(true);
+        if (metadataProvider.isWriteTransaction()) {
+            metadataProvider.setJobTxnId(TransactionIDFactory.generateTransactionId());
+        }
+        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
+                reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
+        sessionConfig.setGenerateJobSpec(false);
+
+        Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
+                metadataProvider.getOutputFile());
         return compiled;
 
     }
 
-    private Pair<JobSpecification, FileSplit> compileQuery(SessionConfig sessionConfig, Query query,
-            AqlMetadataProvider metadataProvider, int varCounter, ICompiledDmlStatement statement)
-            throws RemoteException, AsterixException, AlgebricksException, JSONException, ACIDException {
-        sessionConfig.setGenerateJobSpec(true);
-        MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
-        AqlMetadataProvider metadataProviderInsert = new AqlMetadataProvider(mdTxnCtxQuery, activeDefaultDataverse);
-        metadataProviderInsert.setWriterFactory(metadataProvider.getWriterFactory());
-        metadataProviderInsert.setOutputFile(metadataProvider.getOutputFile());
-        metadataProviderInsert.setConfig(metadataProvider.getConfig());
-        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query, varCounter,
-                statement == null ? null : statement.getDatasetName(), sessionConfig, out, pdf, statement);
-        sessionConfig.setGenerateJobSpec(false);
-        return new Pair<JobSpecification, FileSplit>(spec, metadataProvider.getOutputFile());
-    }
-
     private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         BeginFeedStatement bfs = (BeginFeedStatement) stmt;
         String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                 : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
@@ -713,49 +712,36 @@
                 bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
 
         Dataset dataset;
-        dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, bfs.getDatasetName().getValue());
+        dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+                .getDatasetName().getValue());
         IDatasetDetails datasetDetails = dataset.getDatasetDetails();
         if (datasetDetails.getDatasetType() != DatasetType.FEED) {
             throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
         }
-        bfs.initialize(mdTxnCtx, dataset);
+        bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
         cbfs.setQuery(bfs.getQuery());
-
-        Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider,
-                bfs.getQuery(), sessionConfig, out, pdf);
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
-        Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
-                metadataProvider, reWrittenQuery.second, cbfs);
-        runJob(hcc, compiled.first);
+        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+        jobsToExecute.add(compiled.first);
     }
 
     private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
         ControlFeedStatement cfs = (ControlFeedStatement) stmt;
         String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                 : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
         CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
                 cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-        Job job = new Job(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider), SubmissionMode.ASYNCHRONOUS);
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        runJob(hcc, job.getJobSpec());
+        jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
     }
 
-    private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc)
-            throws Exception {
+    private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+            List<JobSpecification> jobsToExecute) throws Exception {
         Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-        runJob(hcc, compiled.first);
         GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+        jobsToExecute.add(compiled.first);
         return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
     }
 
-    private void runCreateDatasetJob(IHyracksClientConnection hcc, Dataverse dataverse, String datasetName,
-            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException, Exception {
-        runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
-    }
-
     private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
             AqlMetadataProvider metadataProvider) throws Exception {
         // TODO: Eventually CreateIndexStatement and
@@ -772,11 +758,12 @@
             throw new AsterixException("Failed to create job spec for creating index '"
                     + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
         }
-        runJob(hcc, new Job(spec));
+        runJob(hcc, spec);
     }
 
-    private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+            ACIDException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
         String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
@@ -792,11 +779,6 @@
             }
             MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
         }
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-    }
-
-    private void runJob(IHyracksClientConnection hcc, Job job) throws Exception {
-        executeJobArray(hcc, new Job[] { job }, out, pdf);
     }
 
     private void runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
@@ -806,7 +788,7 @@
     private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
             String indexName, AqlMetadataProvider metadataProvider) throws Exception {
         CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-        runJob(hcc, new Job(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)));
+        runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
         MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                 indexName);
     }
@@ -819,7 +801,7 @@
         if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
             JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
             for (JobSpecification spec : jobSpecs)
-                runJob(hcc, new Job(spec));
+                runJob(hcc, spec);
         }
         MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
     }
@@ -846,20 +828,4 @@
         return format;
     }
 
-    public List<Statement> getAqlStatements() {
-        return aqlStatements;
-    }
-
-    public PrintWriter getOut() {
-        return out;
-    }
-
-    public SessionConfig getPc() {
-        return sessionConfig;
-    }
-
-    public DisplayFormat getPdf() {
-        return pdf;
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index a300f9b..4f8e467 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -121,6 +121,7 @@
     private Map<String, String> config;
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
+    private long jobTxnId;
 
     private final Dataverse defaultDataverse;
 
@@ -147,6 +148,10 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixProperties.INSTANCE.getStores();
     }
+    
+    public void setJobTxnId(long txnId){
+    	this.jobTxnId = txnId;
+    }
 
     public Dataverse getDefaultDataverse() {
         return defaultDataverse;
@@ -654,7 +659,7 @@
             TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                     spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
-                    new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, mdTxnCtx.getTxnId());
+                    new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
                     splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -812,12 +817,12 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataverseName, datasetName, indexName);
-            TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+            TreeIndexInsertUpdateDeleteOperatorDescriptor btreeInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                     spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
                     new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE,
-                    mdTxnCtx.getTxnId());
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
+                    jobTxnId);
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeInsert,
                     splitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
@@ -884,15 +889,15 @@
                     spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
                     new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
-                    NoOpOperationCallbackProvider.INSTANCE, mdTxnCtx.getTxnId());
+                    NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    public long getTxnId() {
-        return mdTxnCtx.getTxnId();
+    public long getJobTxnId() {
+        return jobTxnId;
     }
 
     public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {