added compensating operations into DDL operations during forward processing in order to provide correct(best effort) abort behavior
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1131 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 620fb1a..2504a3c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -339,14 +339,16 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
+ String dataverseName = null;
+ String datasetName = null;
try {
DatasetDecl dd = (DatasetDecl) stmt;
- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+ dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
: activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
if (dataverseName == null) {
throw new AlgebricksException(" dataverse not specified ");
}
- String datasetName = dd.getName().getValue();
+ datasetName = dd.getName().getValue();
DatasetType dsType = dd.getDatasetType();
String itemTypeName = dd.getItemTypeName().getValue();
@@ -439,6 +441,38 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
+ //#. execute compensation operations
+ // remove the index in NC
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+ try {
+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, jobSpec);
+ } catch (Exception e3) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ //do no throw exception since still the metadata needs to be compensated.
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e2);
+ }
+
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -453,14 +487,17 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
+ String dataverseName = null;
+ String datasetName = null;
+ String indexName = null;
try {
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
if (dataverseName == null) {
throw new AlgebricksException(" dataverse not specified ");
}
- String datasetName = stmtCreateIndex.getDatasetName().getValue();
+ datasetName = stmtCreateIndex.getDatasetName().getValue();
Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
@@ -469,7 +506,7 @@
+ dataverseName);
}
- String indexName = stmtCreateIndex.getIndexName().getValue();
+ indexName = stmtCreateIndex.getIndexName().getValue();
Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, indexName);
@@ -533,6 +570,38 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
+ //#. execute compensation operations
+ // remove the index in NC
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+ try {
+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, jobSpec);
+ } catch (Exception e3) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ //do no throw exception since still the metadata needs to be compensated.
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName, indexName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e2);
+ }
+
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -592,9 +661,10 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
+ String dvName = null;
try {
DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- String dvName = stmtDelete.getDataverseName().getValue();
+ dvName = stmtDelete.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
if (dv == null) {
@@ -655,6 +725,24 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e2);
+ }
+
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -669,14 +757,16 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
+ String dataverseName = null;
+ String datasetName = null;
try {
DropStatement stmtDelete = (DropStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
if (dataverseName == null) {
throw new AlgebricksException(" dataverse not specified ");
}
- String datasetName = stmtDelete.getDatasetName().getValue();
+ datasetName = stmtDelete.getDatasetName().getValue();
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds == null) {
@@ -730,6 +820,25 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e2);
+ }
+
throw new AlgebricksException(e);
} finally {
releaseWriteLatch();
@@ -744,10 +853,13 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireWriteLatch();
+ String dataverseName = null;
+ String datasetName = null;
+ String indexName = null;
try {
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
- String datasetName = stmtIndexDrop.getDatasetName().getValue();
- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ datasetName = stmtIndexDrop.getDatasetName().getValue();
+ dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
: activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
if (dataverseName == null) {
throw new AlgebricksException(" dataverse not specified ");
@@ -760,7 +872,7 @@
}
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
- String indexName = stmtIndexDrop.getIndexName().getValue();
+ indexName = stmtIndexDrop.getIndexName().getValue();
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
if (index == null) {
if (!stmtIndexDrop.getIfExists()) {
@@ -805,6 +917,25 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
+ //#. execute compensation operations
+ // remove the all indexes in NC
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName, indexName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e2);
+ }
+
throw new AlgebricksException(e);
} finally {
@@ -972,6 +1103,7 @@
if (bActiveTxn) {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
}
+
throw new AlgebricksException(e);
} finally {
releaseReadLatch();