Merged resolution for issue 220:Incorrect use of transaction id in AqlMetadataProvider
into asterix_stabilization (from asterix_stabilization_txn_fix)
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@906 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
index 5961c32..035f1df 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/FuzzyJoinRule.java
@@ -263,7 +263,7 @@
// The translator will compile metadata internally. Run this compilation
// under the same transaction id as the "outer" compilation.
AqlPlusExpressionToPlanTranslator translator = new AqlPlusExpressionToPlanTranslator(
- metadataProvider.getTxnId(), metadataProvider, counter, null, null);
+ metadataProvider.getJobTxnId(), metadataProvider, counter, null, null);
LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 5032a48..95ec76c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -261,6 +261,7 @@
OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+
ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
if (pc.isOptimize()) {
compiler.optimize();
@@ -297,13 +298,6 @@
}
}
- if (!pc.isGenerateJobSpec()) {
- // Job spec not requested. Consider transaction against metadata
- // committed.
- MetadataManager.INSTANCE.commitTransaction(queryMetadataProvider.getMetadataTxnContext());
- return null;
- }
-
AlgebricksPartitionConstraint clusterLocs = queryMetadataProvider.getClusterLocations();
builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
@@ -320,7 +314,7 @@
JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE);
// set the job event listener
- spec.setJobletEventListenerFactory(new JobEventListenerFactory(queryMetadataProvider.getTxnId(),
+ spec.setJobletEventListenerFactory(new JobEventListenerFactory(queryMetadataProvider.getJobTxnId(),
isWriteTransaction));
if (pc.isPrintJob()) {
switch (pdf) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index be6305b..08e1c5f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -27,7 +27,6 @@
import edu.uci.ics.asterix.api.common.APIFramework;
import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
import edu.uci.ics.asterix.api.common.Job;
-import edu.uci.ics.asterix.api.common.Job.SubmissionMode;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
@@ -83,6 +82,7 @@
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.types.TypeSignature;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionIDFactory;
import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledBeginFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
@@ -134,12 +134,12 @@
return functionDecls;
}
- public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc) throws AlgebricksException,
- RemoteException, ACIDException, AsterixException {
+ public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc) throws Exception {
List<QueryResult> executionResult = new ArrayList<QueryResult>();
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
Map<String, String> config = new HashMap<String, String>();
+ List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
for (Statement stmt : aqlStatements) {
validateOperation(activeDefaultDataverse, stmt);
@@ -148,143 +148,159 @@
metadataProvider.setWriterFactory(writerFactory);
metadataProvider.setOutputFile(outputFile);
metadataProvider.setConfig(config);
+ jobsToExecute.clear();
try {
switch (stmt.getKind()) {
case SET: {
- SetStatement ss = (SetStatement) stmt;
- String pname = ss.getPropName();
- String pvalue = ss.getPropValue();
- config.put(pname, pvalue);
+ handleSetStatement(metadataProvider, stmt, config, jobsToExecute);
break;
}
case DATAVERSE_DECL: {
- activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
+ activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case CREATE_DATAVERSE: {
- handleCreateDataverseStatement(metadataProvider, stmt);
+ handleCreateDataverseStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case DATASET_DECL: {
- handleCreateDatasetStatement(metadataProvider, stmt, hcc);
+ handleCreateDatasetStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case CREATE_INDEX: {
- handleCreateIndexStatement(metadataProvider, stmt, hcc);
+ handleCreateIndexStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case TYPE_DECL: {
- handleCreateTypeStatement(metadataProvider, stmt);
+ handleCreateTypeStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case NODEGROUP_DECL: {
- handleCreateNodeGroupStatement(metadataProvider, stmt);
+ handleCreateNodeGroupStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case DATAVERSE_DROP: {
- handleDataverseDropStatement(metadataProvider, stmt, hcc);
+ handleDataverseDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case DATASET_DROP: {
- handleDatasetDropStatement(metadataProvider, stmt, hcc);
+ handleDatasetDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case INDEX_DROP: {
- handleIndexDropStatement(metadataProvider, stmt, hcc);
+ handleIndexDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case TYPE_DROP: {
- handleTypeDropStatement(metadataProvider, stmt);
+ handleTypeDropStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case NODEGROUP_DROP: {
- handleNodegroupDropStatement(metadataProvider, stmt);
+ handleNodegroupDropStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case CREATE_FUNCTION: {
- handleCreateFunctionStatement(metadataProvider, stmt);
+ handleCreateFunctionStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case FUNCTION_DROP: {
- handleFunctionDropStatement(metadataProvider, stmt);
+ handleFunctionDropStatement(metadataProvider, stmt, jobsToExecute);
break;
}
case LOAD_FROM_FILE: {
- handleLoadFromFileStatement(metadataProvider, stmt, hcc);
+ handleLoadFromFileStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case WRITE_FROM_QUERY_RESULT: {
- handleWriteFromQueryResultStatement(metadataProvider, stmt, hcc);
+ handleWriteFromQueryResultStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case INSERT: {
- handleInsertStatement(metadataProvider, stmt, hcc);
+ handleInsertStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case DELETE: {
- handleDeleteStatement(metadataProvider, stmt, hcc);
+ handleDeleteStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case BEGIN_FEED: {
- handleBeginFeedStatement(metadataProvider, stmt, hcc);
+ handleBeginFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case CONTROL_FEED: {
- handleControlFeedStatement(metadataProvider, stmt, hcc);
+ handleControlFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
break;
}
case QUERY: {
- executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc));
- metadataProvider.setWriteTransaction(false);
+ executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc, jobsToExecute));
break;
}
case WRITE: {
- WriteStatement ws = (WriteStatement) stmt;
- File f = new File(ws.getFileName());
- outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
- if (ws.getWriterClassName() != null) {
- try {
- writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- throw new AsterixException(e);
- }
+ Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt,
+ jobsToExecute);
+ if (result.first != null) {
+ writerFactory = result.first;
}
+ outputFile = result.second;
break;
}
}
-
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
throw new AlgebricksException(e);
}
+ // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
+ for (JobSpecification jobspec : jobsToExecute) {
+ runJob(hcc, jobspec);
+ }
}
return executionResult;
}
- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, RemoteException, ACIDException {
+ private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config,
+ List<JobSpecification> jobsToExecute) throws RemoteException, ACIDException {
+ SetStatement ss = (SetStatement) stmt;
+ String pname = ss.getPropName();
+ String pvalue = ss.getPropValue();
+ config.put(pname, pvalue);
+ }
+
+ private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws InstantiationException, IllegalAccessException,
+ ClassNotFoundException {
+ WriteStatement ws = (WriteStatement) stmt;
+ File f = new File(ws.getFileName());
+ FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
+ IAWriterFactory writerFactory = null;
+ if (ws.getWriterClassName() != null) {
+ writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
+ }
+ return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
+ }
+
+ private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
DataverseDecl dvd = (DataverseDecl) stmt;
String dvName = dvd.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
if (dv == null) {
- throw new MetadataException(" Unknown dataverse " + dvName);
+ throw new MetadataException("Unknown dataverse " + dvName);
}
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
return dv;
-
}
- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
String dvName = stmtCreateDataverse.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
@@ -293,11 +309,10 @@
}
MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
stmtCreateDataverse.getFormat()));
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
}
private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws AsterixException, Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
DatasetDecl dd = (DatasetDecl) stmt;
String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
: activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
@@ -363,14 +378,12 @@
if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dataverseName);
- runCreateDatasetJob(hcc, dataverse, datasetName, metadataProvider);
+ runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
}
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
-
}
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
@@ -405,11 +418,11 @@
.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
runJob(hcc, loadIndexJobSpec);
}
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
}
- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, RemoteException, ACIDException, MetadataException {
+ private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
+ MetadataException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
TypeDecl stmtCreateType = (TypeDecl) stmt;
String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -437,11 +450,10 @@
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
}
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
String dvName = stmtDelete.getDataverseName().getValue();
@@ -473,11 +485,10 @@
activeDefaultDataverse = null;
}
}
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
}
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
DropStatement stmtDelete = (DropStatement) stmt;
String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -503,11 +514,10 @@
}
compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
String datasetName = stmtIndexDrop.getDatasetName().getValue();
@@ -532,11 +542,11 @@
throw new AlgebricksException(datasetName
+ " is an external dataset. Indexes are not maintained for external datasets.");
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+ private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+ ACIDException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
@@ -552,12 +562,11 @@
} else {
MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
}
- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
String nodegroupName = stmtDelete.getNodeGroupName().getValue();
@@ -568,11 +577,11 @@
} else {
MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+ private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+ ACIDException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
@@ -588,11 +597,11 @@
.getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, RemoteException, ACIDException, AlgebricksException {
+ private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
+ AlgebricksException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
FunctionSignature signature = stmtDropFunction.getFunctionSignature();
@@ -603,12 +612,10 @@
} else {
MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- List<Job> jobs = new ArrayList<Job>();
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -618,7 +625,7 @@
IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
- jobs.add(job);
+ jobsToExecute.add(job.getJobSpec());
// Also load the dataset's secondary indexes.
List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
.getDatasetName().getValue());
@@ -629,17 +636,13 @@
// Create CompiledCreateIndexStatement from metadata entity 'index'.
CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
- JobSpecification jobSpec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
- jobs.add(new Job(jobSpec));
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- for (Job j : jobs) {
- runJob(hcc, j.getJobSpec());
+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
}
}
private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+ metadataProvider.setWriteTransaction(true);
WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
@@ -647,11 +650,11 @@
.getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- runJob(hcc, compiled.first);
+ jobsToExecute.add(compiled.first);
}
private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
metadataProvider.setWriteTransaction(true);
InsertStatement stmtInsert = (InsertStatement) stmt;
String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -659,11 +662,11 @@
CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
.getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- runJob(hcc, compiled.first);
+ jobsToExecute.add(compiled.first);
}
private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
metadataProvider.setWriteTransaction(true);
DeleteStatement stmtDelete = (DeleteStatement) stmt;
String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -672,39 +675,35 @@
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
stmtDelete.getVarCounter(), metadataProvider);
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- runJob(hcc, compiled.first);
+ jobsToExecute.add(compiled.first);
}
private Pair<JobSpecification, FileSplit> rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
ACIDException {
+
+ // Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
sessionConfig, out, pdf);
- MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
- Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
- metadataProvider, reWrittenQuery.second, stmt);
+
+ // Query Compilation (happens under the same ongoing metadata
+ // transaction)
+ sessionConfig.setGenerateJobSpec(true);
+ if (metadataProvider.isWriteTransaction()) {
+ metadataProvider.setJobTxnId(TransactionIDFactory.generateTransactionId());
+ }
+ JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
+ reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
+ sessionConfig.setGenerateJobSpec(false);
+
+ Pair<JobSpecification, FileSplit> compiled = new Pair<JobSpecification, FileSplit>(spec,
+ metadataProvider.getOutputFile());
return compiled;
}
- private Pair<JobSpecification, FileSplit> compileQuery(SessionConfig sessionConfig, Query query,
- AqlMetadataProvider metadataProvider, int varCounter, ICompiledDmlStatement statement)
- throws RemoteException, AsterixException, AlgebricksException, JSONException, ACIDException {
- sessionConfig.setGenerateJobSpec(true);
- MetadataTransactionContext mdTxnCtxQuery = MetadataManager.INSTANCE.beginTransaction();
- AqlMetadataProvider metadataProviderInsert = new AqlMetadataProvider(mdTxnCtxQuery, activeDefaultDataverse);
- metadataProviderInsert.setWriterFactory(metadataProvider.getWriterFactory());
- metadataProviderInsert.setOutputFile(metadataProvider.getOutputFile());
- metadataProviderInsert.setConfig(metadataProvider.getConfig());
- JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query, varCounter,
- statement == null ? null : statement.getDatasetName(), sessionConfig, out, pdf, statement);
- sessionConfig.setGenerateJobSpec(false);
- return new Pair<JobSpecification, FileSplit>(spec, metadataProvider.getOutputFile());
- }
-
private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
@@ -713,49 +712,36 @@
bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
Dataset dataset;
- dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, bfs.getDatasetName().getValue());
+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+ .getDatasetName().getValue());
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() != DatasetType.FEED) {
throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
}
- bfs.initialize(mdTxnCtx, dataset);
+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
cbfs.setQuery(bfs.getQuery());
-
- Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider,
- bfs.getQuery(), sessionConfig, out, pdf);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
- Pair<JobSpecification, FileSplit> compiled = compileQuery(sessionConfig, reWrittenQuery.first,
- metadataProvider, reWrittenQuery.second, cbfs);
- runJob(hcc, compiled.first);
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+ jobsToExecute.add(compiled.first);
}
private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
ControlFeedStatement cfs = (ControlFeedStatement) stmt;
String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
- Job job = new Job(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider), SubmissionMode.ASYNCHRONOUS);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- runJob(hcc, job.getJobSpec());
+ jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
}
- private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc)
- throws Exception {
+ private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+ List<JobSpecification> jobsToExecute) throws Exception {
Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
- runJob(hcc, compiled.first);
GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+ jobsToExecute.add(compiled.first);
return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
}
- private void runCreateDatasetJob(IHyracksClientConnection hcc, Dataverse dataverse, String datasetName,
- AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException, Exception {
- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
- }
-
private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
AqlMetadataProvider metadataProvider) throws Exception {
// TODO: Eventually CreateIndexStatement and
@@ -772,11 +758,12 @@
throw new AsterixException("Failed to create job spec for creating index '"
+ stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
}
- runJob(hcc, new Job(spec));
+ runJob(hcc, spec);
}
- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
@@ -792,11 +779,6 @@
}
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- }
-
- private void runJob(IHyracksClientConnection hcc, Job job) throws Exception {
- executeJobArray(hcc, new Job[] { job }, out, pdf);
}
private void runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
@@ -806,7 +788,7 @@
private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
String indexName, AqlMetadataProvider metadataProvider) throws Exception {
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- runJob(hcc, new Job(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)));
+ runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
indexName);
}
@@ -819,7 +801,7 @@
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
for (JobSpecification spec : jobSpecs)
- runJob(hcc, new Job(spec));
+ runJob(hcc, spec);
}
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
}
@@ -846,20 +828,4 @@
return format;
}
- public List<Statement> getAqlStatements() {
- return aqlStatements;
- }
-
- public PrintWriter getOut() {
- return out;
- }
-
- public SessionConfig getPc() {
- return sessionConfig;
- }
-
- public DisplayFormat getPdf() {
- return pdf;
- }
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index a300f9b..4f8e467 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -121,6 +121,7 @@
private Map<String, String> config;
private IAWriterFactory writerFactory;
private FileSplit outputFile;
+ private long jobTxnId;
private final Dataverse defaultDataverse;
@@ -147,6 +148,10 @@
this.defaultDataverse = defaultDataverse;
this.stores = AsterixProperties.INSTANCE.getStores();
}
+
+ public void setJobTxnId(long txnId){
+ this.jobTxnId = txnId;
+ }
public Dataverse getDefaultDataverse() {
return defaultDataverse;
@@ -654,7 +659,7 @@
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, mdTxnCtx.getTxnId());
+ new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -812,12 +817,12 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE,
- mdTxnCtx.getTxnId());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
+ jobTxnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeInsert,
splitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
@@ -884,15 +889,15 @@
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
- NoOpOperationCallbackProvider.INSTANCE, mdTxnCtx.getTxnId());
+ NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- public long getTxnId() {
- return mdTxnCtx.getTxnId();
+ public long getJobTxnId() {
+ return jobTxnId;
}
public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {