Move all the job executions to the handle methods for all type of statements other than query.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1136 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 508126f..22f5895 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -29,6 +29,7 @@
import edu.uci.ics.asterix.api.common.Job;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.base.Statement.Kind;
import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
import edu.uci.ics.asterix.aql.expression.ControlFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
@@ -101,6 +102,7 @@
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -388,7 +390,8 @@
if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dataverseName);
- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+ JobId jobId = runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+ hcc.waitForCompletion(jobId);
}
}
@@ -426,7 +429,8 @@
index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
JobSpecification loadIndexJobSpec = IndexOperations
.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
- runJob(hcc, loadIndexJobSpec);
+ JobId jobId = runJob(hcc, loadIndexJobSpec);
+ hcc.waitForCompletion(jobId);
}
}
@@ -659,9 +663,9 @@
CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
.getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ if (compiled != null) {
+ jobsToExecute.add(compiled);
}
}
@@ -673,9 +677,10 @@
: activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
.getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ if (compiled != null) {
+ jobsToExecute.add(compiled);
}
}
@@ -688,13 +693,13 @@
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
stmtDelete.getVarCounter(), metadataProvider);
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ if (compiled != null) {
+ jobsToExecute.add(compiled);
}
}
- private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
+ private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
ACIDException {
@@ -709,9 +714,7 @@
JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
- Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
- metadataProvider.getOutputFile());
- return compiled;
+ return spec;
}
@@ -735,9 +738,10 @@
}
bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
cbfs.setQuery(bfs.getQuery());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+ if (compiled != null) {
+ jobsToExecute.add(compiled);
}
}
@@ -753,10 +757,10 @@
private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
List<JobSpecification> jobsToExecute) throws Exception {
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
- if (compiled.first != null) {
- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
- jobsToExecute.add(compiled.first);
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
+ if (compiled != null) {
+ GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
+ jobsToExecute.add(compiled);
}
return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
}
@@ -777,7 +781,8 @@
throw new AsterixException("Failed to create job spec for creating index '"
+ stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
}
- runJob(hcc, spec);
+ JobId jobId = runJob(hcc, spec);
+ hcc.waitForCompletion(jobId);
}
private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
@@ -800,14 +805,16 @@
}
}
- private void runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
- executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
+ private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
+ JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
+ return jobIds[0];
}
private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
String indexName, AqlMetadataProvider metadataProvider) throws Exception {
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ JobId jobId = runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ hcc.waitForCompletion(jobId);
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
indexName);
}
@@ -819,20 +826,24 @@
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
- for (JobSpecification spec : jobSpecs)
- runJob(hcc, spec);
+ for (JobSpecification spec : jobSpecs) {
+ JobId jobId = runJob(hcc, spec);
+ hcc.waitForCompletion(jobId);
+ }
}
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
}
- public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
+ public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
throws Exception {
+ JobId[] startedJobIds = new JobId[jobs.length];
for (int i = 0; i < jobs.length; i++) {
JobSpecification spec = jobs[i].getJobSpec();
spec.setMaxReattempts(0);
JobId jobId = hcc.startJob(GlobalConfig.HYRACKS_APP_NAME, spec);
- hcc.waitForCompletion(jobId);
+ startedJobIds[i] = jobId;
}
+ return startedJobIds;
}
private static IDataFormat getDataFormat(MetadataTransactionContext mdTxnCtx, String dataverseName)