changes to 1) fix Issue 379: DDL transaction model broken and 2) cleanup and deliver all thrown exception to the caller(reviewed by Raman)
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 9900f71..bd8d2b8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -123,6 +123,11 @@
*/
public class AqlTranslator extends AbstractAqlTranslator {
+ private enum ProgressState {
+ NO_PROGRESS,
+ ADDED_PENDINGOP_RECORD_TO_METADATA
+ }
+
private final List<Statement> aqlStatements;
private final PrintWriter out;
private final SessionConfig sessionConfig;
@@ -311,7 +316,7 @@
}
private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, RemoteException, ACIDException {
+ throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -327,15 +332,14 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return dv;
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new MetadataException(e);
} finally {
releaseReadLatch();
}
}
- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -345,14 +349,19 @@
CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
String dvName = stmtCreateDataverse.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+ if (dv != null) {
+ if (stmtCreateDataverse.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+ }
}
MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -362,6 +371,7 @@
private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws AsterixException, Exception {
+ ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -440,7 +450,7 @@
break;
}
}
-
+
//#. initialize DatasetIdFactory if it is not initialized.
if (!DatasetIdFactory.isInitialized()) {
DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
@@ -460,6 +470,7 @@
//#. make metadataTxn commit before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
//#. runJob
runJob(hcc, jobSpec, true);
@@ -472,18 +483,21 @@
//#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
- datasetName, itemTypeName, datasetDetails, dd.getHints(), dsType, dataset.getDatasetId(),
- IMetadataEntity.PENDING_NO_OP));
+ dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
- if (dataset != null) {
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+
//#. execute compensation operations
// remove the index in NC
+ // [Notice]
+ // As long as we updated(and committed) metadata, we should remove any effect of the job
+ // because an exception occurs during runJob.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -494,11 +508,11 @@
bActiveTxn = false;
runJob(hcc, jobSpec, true);
- } catch (Exception e3) {
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e2, mdTxnCtx);
}
- //do no throw exception since still the metadata needs to be compensated.
}
// remove the record from the metadata.
@@ -509,8 +523,10 @@
datasetName);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e2);
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+ + "." + datasetName + ") couldn't be removed from the metadata", e);
}
}
@@ -523,6 +539,7 @@
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -560,12 +577,11 @@
aRecordType.validateKeyFields(stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getIndexType());
if (idx != null) {
- if (!stmtCreateIndex.getIfNotExists()) {
- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
- } else {
- stmtCreateIndex.setNeedToCreate(false);
+ if (stmtCreateIndex.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return;
+ } else {
+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
}
}
@@ -575,7 +591,7 @@
IMetadataEntity.PENDING_ADD_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
- //#. create the index artifact in NC.
+ //#. prepare to create the index artifact in NC.
CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
@@ -585,7 +601,9 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+ //#. create the index artifact in NC.
runJob(hcc, spec, true);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -609,18 +627,16 @@
//#. add another new index with PendingNoOp after deleting the index with PendingAddOp
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
indexName);
- index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
- IMetadataEntity.PENDING_NO_OP);
+ index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
- if (spec != null) {
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
//#. execute compensation operations
// remove the index in NC
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -633,11 +649,11 @@
bActiveTxn = false;
runJob(hcc, jobSpec, true);
- } catch (Exception e3) {
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e2, mdTxnCtx);
}
- //do no throw exception since still the metadata needs to be compensated.
}
// remove the record from the metadata.
@@ -648,8 +664,10 @@
datasetName, indexName);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e2);
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+ + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
}
}
throw new AlgebricksException(e);
@@ -658,8 +676,7 @@
}
}
- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, RemoteException, ACIDException, MetadataException {
+ private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -695,7 +712,7 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -705,43 +722,46 @@
private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
- String dvName = null;
+ String dataverseName = null;
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- dvName = stmtDelete.getDataverseName().getValue();
+ dataverseName = stmtDelete.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
- if (!stmtDelete.getIfExists()) {
- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+ if (stmtDelete.getIfExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
}
//#. prepare jobs which will drop corresponding datasets with indexes.
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
for (int j = 0; j < datasets.size(); j++) {
String datasetName = datasets.get(j).getDatasetName();
DatasetType dsType = datasets.get(j).getDatasetType();
if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+ datasetName);
for (int k = 0; k < indexes.size(); k++) {
if (indexes.get(k).isSecondaryIndex()) {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
indexes.get(k).getIndexName());
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
}
}
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
}
}
@@ -749,12 +769,13 @@
//#. mark PendingDropOp on the dataverse record by
// first, deleting the dataverse record from the DATAVERSE_DATASET
// second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
IMetadataEntity.PENDING_DROP_OP));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
runJob(hcc, jobSpec, true);
@@ -765,32 +786,44 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
//#. finally, delete the dataverse.
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+ if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
activeDefaultDataverse = null;
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
- //#. execute compensation operations
- // remove the all indexes in NC
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+ if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+ activeDefaultDataverse = null;
+ }
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e2);
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ try {
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec, true);
+ }
+ } catch (Exception e2) {
+ //do no throw exception since still the metadata needs to be compensated.
+ e.addSuppressed(e2);
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
+ + ") couldn't be removed from the metadata", e);
+ }
}
throw new AlgebricksException(e);
@@ -802,6 +835,7 @@
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -821,12 +855,13 @@
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds == null) {
- if (!stmtDelete.getIfExists()) {
+ if (stmtDelete.getIfExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
throw new AlgebricksException("There is no dataset with this name " + datasetName
+ " in dataverse " + dataverseName + ".");
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
}
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
@@ -852,6 +887,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
//#. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
@@ -869,25 +905,34 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
- //#. execute compensation operations
- // remove the all indexes in NC
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ try {
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec, true);
+ }
+ } catch (Exception e2) {
+ //do no throw exception since still the metadata needs to be compensated.
+ e.addSuppressed(e2);
+ }
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e2);
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+ + "." + datasetName + ") couldn't be removed from the metadata", e);
+ }
}
throw new AlgebricksException(e);
@@ -899,6 +944,7 @@
private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -927,38 +973,39 @@
indexName = stmtIndexDrop.getIndexName().getValue();
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
if (index == null) {
- if (!stmtIndexDrop.getIfExists()) {
+ if (stmtIndexDrop.getIfExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
throw new AlgebricksException("There is no index with this name " + indexName + ".");
}
- } else {
- //#. prepare a job to drop the index in NC.
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexName);
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-
- //#. mark PendingDropOp on the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- MetadataManager.INSTANCE.addIndex(
- mdTxnCtx,
- new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
- .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
- //#. commit the existing transaction before calling runJob.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
-
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
-
- //#. begin a new transaction
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. finally, delete the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
}
+ //#. prepare a job to drop the index in NC.
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+
+ //#. mark PendingDropOp on the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(),
+ index.getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+ //#. commit the existing transaction before calling runJob.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec, true);
+ }
+
+ //#. begin a new transaction
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
} else {
throw new AlgebricksException(datasetName
+ " is an external dataset. Indexes are not maintained for external datasets.");
@@ -967,25 +1014,34 @@
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
- //#. execute compensation operations
- // remove the all indexes in NC
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ try {
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec, true);
+ }
+ } catch (Exception e2) {
+ //do no throw exception since still the metadata needs to be compensated.
+ e.addSuppressed(e2);
+ }
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, indexName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e2);
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName, indexName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+ + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+ }
}
throw new AlgebricksException(e);
@@ -995,8 +1051,7 @@
}
}
- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+ private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1019,15 +1074,14 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
}
}
- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1046,15 +1100,14 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
}
}
- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+ private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
@@ -1077,15 +1130,14 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
}
}
- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, RemoteException, ACIDException, AlgebricksException {
+ private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
@@ -1102,7 +1154,7 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -1149,7 +1201,7 @@
}
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
@@ -1181,7 +1233,7 @@
}
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1215,7 +1267,7 @@
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1250,7 +1302,7 @@
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1315,7 +1367,7 @@
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1345,7 +1397,7 @@
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1407,7 +1459,7 @@
return queryResult;
} catch (Exception e) {
if (bActiveTxn) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
}
throw new AlgebricksException(e);
} finally {
@@ -1415,8 +1467,7 @@
}
}
- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+ private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1439,7 +1490,7 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ abort(e, e, mdTxnCtx);
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -1494,4 +1545,13 @@
private void releaseReadLatch() {
MetadataManager.INSTANCE.releaseReadLatch();
}
+
+ private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ parentE.addSuppressed(e2);
+ throw new IllegalStateException(rootE);
+ }
+ }
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
index ffd0534..7a764ae 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
@@ -12,7 +12,6 @@
public class CreateIndexStatement implements Statement {
private Identifier indexName;
- private boolean needToCreate = true;
private Identifier dataverseName;
private Identifier datasetName;
private List<String> fieldExprs = new ArrayList<String>();
@@ -33,14 +32,6 @@
return gramLength;
}
- public void setNeedToCreate(boolean needToCreate) {
- this.needToCreate = needToCreate;
- }
-
- public boolean getNeedToCreate() {
- return needToCreate;
- }
-
public Identifier getIndexName() {
return indexName;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index 91a18f3..070e6ea 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -40,7 +40,7 @@
private final Map<String, String> hints;
private final int datasetId;
// Type of pending operations with respect to atomic DDL operation
- private final int pendingOp;
+ private int pendingOp;
public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp) {
@@ -86,6 +86,10 @@
return pendingOp;
}
+ public void setPendingOp(int pendingOp) {
+ this.pendingOp = pendingOp;
+ }
+
@Override
public Object addToCache(MetadataCache cache) {
return cache.addDatasetIfNotExists(this);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
index 6d65730..aa29e5b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.metadata.entities;
-import java.io.Serializable;
import java.util.List;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
@@ -46,7 +45,7 @@
// Specific to NGRAM indexes.
private final int gramLength;
// Type of pending operations with respect to atomic DDL operation
- private final int pendingOp;
+ private int pendingOp;
public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
@@ -103,6 +102,10 @@
public int getPendingOp() {
return pendingOp;
}
+
+ public void setPendingOp(int pendingOp) {
+ this.pendingOp = pendingOp;
+ }
public boolean isSecondaryIndex() {
return !isPrimaryIndex();