Move all the job executions to the handle methods for all type of statements other than query.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1136 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 508126f..22f5895 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.asterix.api.common.Job;
 import edu.uci.ics.asterix.api.common.SessionConfig;
 import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.base.Statement.Kind;
 import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
 import edu.uci.ics.asterix.aql.expression.ControlFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
@@ -101,6 +102,7 @@
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -388,7 +390,8 @@
         if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
             Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
                     dataverseName);
-            runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+            JobId jobId = runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+            hcc.waitForCompletion(jobId);
         }
     }
 
@@ -426,7 +429,8 @@
                     index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
             JobSpecification loadIndexJobSpec = IndexOperations
                     .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-            runJob(hcc, loadIndexJobSpec);
+            JobId jobId = runJob(hcc, loadIndexJobSpec);
+            hcc.waitForCompletion(jobId);
         }
     }
 
@@ -659,9 +663,9 @@
         CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
                 .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
 
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+        if (compiled != null) {
+            jobsToExecute.add(compiled);
         }
     }
 
@@ -673,9 +677,10 @@
                 : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
         CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
                 .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+        if (compiled != null) {
+            jobsToExecute.add(compiled);
         }
     }
 
@@ -688,13 +693,13 @@
         CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                 stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
                 stmtDelete.getVarCounter(), metadataProvider);
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+        if (compiled != null) {
+            jobsToExecute.add(compiled);
         }
     }
 
-    private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
+    private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
             ACIDException {
 
@@ -709,9 +714,7 @@
         JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
 
-        Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
-                metadataProvider.getOutputFile());
-        return compiled;
+        return spec;
 
     }
 
@@ -735,9 +738,10 @@
         }
         bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
         cbfs.setQuery(bfs.getQuery());
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+        if (compiled != null) {
+            jobsToExecute.add(compiled);
         }
     }
 
@@ -753,10 +757,10 @@
 
     private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
             List<JobSpecification> jobsToExecute) throws Exception {
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-        if (compiled.first != null) {
-            GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-            jobsToExecute.add(compiled.first);
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
+        if (compiled != null) {
+            GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
+            jobsToExecute.add(compiled);
         }
         return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
     }
@@ -777,7 +781,8 @@
             throw new AsterixException("Failed to create job spec for creating index '"
                     + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
         }
-        runJob(hcc, spec);
+        JobId jobId = runJob(hcc, spec);
+        hcc.waitForCompletion(jobId);
     }
 
     private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
@@ -800,14 +805,16 @@
         }
     }
 
-    private void runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
-        executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
+    private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
+        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
+        return jobIds[0];
     }
 
     private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
             String indexName, AqlMetadataProvider metadataProvider) throws Exception {
         CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-        runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+        JobId jobId = runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+        hcc.waitForCompletion(jobId);
         MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                 indexName);
     }
@@ -819,20 +826,24 @@
         Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
         if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
             JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-            for (JobSpecification spec : jobSpecs)
-                runJob(hcc, spec);
+            for (JobSpecification spec : jobSpecs) {
+                JobId jobId = runJob(hcc, spec);
+                hcc.waitForCompletion(jobId);
+            }
         }
         MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
     }
 
-    public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
+    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
             throws Exception {
+        JobId[] startedJobIds = new JobId[jobs.length];
         for (int i = 0; i < jobs.length; i++) {
             JobSpecification spec = jobs[i].getJobSpec();
             spec.setMaxReattempts(0);
             JobId jobId = hcc.startJob(GlobalConfig.HYRACKS_APP_NAME, spec);
-            hcc.waitForCompletion(jobId);
+            startedJobIds[i] = jobId;
         }
+        return startedJobIds;
     }
 
     private static IDataFormat getDataFormat(MetadataTransactionContext mdTxnCtx, String dataverseName)