reverting earlier changes to AqlTranslator, as there are more changes required and rightly need to be handled through a separate branch and merged back upon code review
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@900 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 7a87acb..be6305b 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -241,6 +241,7 @@
case QUERY: {
executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc));
+ metadataProvider.setWriteTransaction(false);
break;
}
@@ -632,7 +633,6 @@
jobs.add(new Job(jobSpec));
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- // Loading from a file does not happen under a transaction!
for (Job j : jobs) {
runJob(hcc, j.getJobSpec());
}
@@ -647,7 +647,6 @@
.getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
@@ -660,7 +659,6 @@
CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
.getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
@@ -674,40 +672,39 @@
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
stmtDelete.getVarCounter(), metadataProvider);
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
}
private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
ACIDException {
-
- // Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
sessionConfig, out, pdf);
-
- // Query Compilation (happens under a new transaction, which is committed/aborted by installing a JobEventListener)
- MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
- AqlMetadataProvider metadataProviderQueryCompilation = new AqlMetadataProvider(mdTxnCtxQuery,
- activeDefaultDataverse);
- metadataProviderQueryCompilation.setWriterFactory(metadataProvider.getWriterFactory());
- metadataProviderQueryCompilation.setOutputFile(metadataProvider.getOutputFile());
- metadataProviderQueryCompilation.setConfig(metadataProvider.getConfig());
- metadataProviderQueryCompilation.setWriteTransaction(metadataProvider.isWriteTransaction());
-
- sessionConfig.setGenerateJobSpec(true);
- JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProviderQueryCompilation, query,
- reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
- sessionConfig.setGenerateJobSpec(false);
-
- Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
- metadataProviderQueryCompilation.getOutputFile());
+ MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+ Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
+ metadataProvider, reWrittenQuery.second, stmt);
return compiled;
}
+ private Pair<JobSpecification, FileSplit> compileQuery(SessionConfig sessionConfig, Query query,
+ AqlMetadataProvider metadataProvider, int varCounter, ICompiledDmlStatement statement)
+ throws RemoteException, AsterixException, AlgebricksException, JSONException, ACIDException {
+ sessionConfig.setGenerateJobSpec(true);
+ MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
+ AqlMetadataProvider metadataProviderInsert = new AqlMetadataProvider(mdTxnCtxQuery, activeDefaultDataverse);
+ metadataProviderInsert.setWriterFactory(metadataProvider.getWriterFactory());
+ metadataProviderInsert.setOutputFile(metadataProvider.getOutputFile());
+ metadataProviderInsert.setConfig(metadataProvider.getConfig());
+ JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query, varCounter,
+ statement == null ? null : statement.getDatasetName(), sessionConfig, out, pdf, statement);
+ sessionConfig.setGenerateJobSpec(false);
+ return new Pair<JobSpecification, FileSplit>(spec, metadataProvider.getOutputFile());
+ }
+
private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
@@ -716,17 +713,20 @@
bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
Dataset dataset;
- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
- .getDatasetName().getValue());
+ dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, bfs.getDatasetName().getValue());
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() != DatasetType.FEED) {
throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
}
- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+ bfs.initialize(mdTxnCtx, dataset);
cbfs.setQuery(bfs.getQuery());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+ Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider,
+ bfs.getQuery(), sessionConfig, out, pdf);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
+ metadataProvider, reWrittenQuery.second, cbfs);
runJob(hcc, compiled.first);
}
@@ -746,7 +746,6 @@
private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc)
throws Exception {
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
runJob(hcc, compiled.first);
GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());