1) Combined query rewriting and compilation to happen in a single method.
2) Removed method compileQuery from AqlTranslator
3) Code refactoring around the invocations to previously existing separate methods.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@897 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index be6305b..82fba4f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -633,6 +633,7 @@
jobs.add(new Job(jobSpec));
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ // Loading from a file does not happen under a transaction!
for (Job j : jobs) {
runJob(hcc, j.getJobSpec());
}
@@ -647,6 +648,7 @@
.getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
@@ -659,6 +661,7 @@
CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
.getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
@@ -672,39 +675,39 @@
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
stmtDelete.getVarCounter(), metadataProvider);
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
ACIDException {
+
+ // Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
sessionConfig, out, pdf);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
- Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
- metadataProvider, reWrittenQuery.second, stmt);
+
+ // Query Compilation (happens under a new transaction, which is committed/aborted by installing a JobEventListener)
+ MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
+ AqlMetadataProvider metadataProviderQueryCompilation = new AqlMetadataProvider(mdTxnCtxQuery,
+ activeDefaultDataverse);
+ metadataProviderQueryCompilation.setWriterFactory(metadataProvider.getWriterFactory());
+ metadataProviderQueryCompilation.setOutputFile(metadataProvider.getOutputFile());
+ metadataProviderQueryCompilation.setConfig(metadataProvider.getConfig());
+
+ sessionConfig.setGenerateJobSpec(true);
+ JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
+ reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
+ sessionConfig.setGenerateJobSpec(false);
+
+ Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
+ metadataProvider.getOutputFile());
return compiled;
}
- private Pair<JobSpecification, FileSplit> compileQuery(SessionConfig sessionConfig, Query query,
- AqlMetadataProvider metadataProvider, int varCounter, ICompiledDmlStatement statement)
- throws RemoteException, AsterixException, AlgebricksException, JSONException, ACIDException {
- sessionConfig.setGenerateJobSpec(true);
- MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
- AqlMetadataProvider metadataProviderInsert = new AqlMetadataProvider(mdTxnCtxQuery, activeDefaultDataverse);
- metadataProviderInsert.setWriterFactory(metadataProvider.getWriterFactory());
- metadataProviderInsert.setOutputFile(metadataProvider.getOutputFile());
- metadataProviderInsert.setConfig(metadataProvider.getConfig());
- JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query, varCounter,
- statement == null ? null : statement.getDatasetName(), sessionConfig, out, pdf, statement);
- sessionConfig.setGenerateJobSpec(false);
- return new Pair<JobSpecification, FileSplit>(spec, metadataProvider.getOutputFile());
- }
-
private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
@@ -713,20 +716,17 @@
bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
Dataset dataset;
- dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, bfs.getDatasetName().getValue());
+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+ .getDatasetName().getValue());
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() != DatasetType.FEED) {
throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
}
- bfs.initialize(mdTxnCtx, dataset);
+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
cbfs.setQuery(bfs.getQuery());
- Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider,
- bfs.getQuery(), sessionConfig, out, pdf);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
- Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
- metadataProvider, reWrittenQuery.second, cbfs);
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+ MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}