added compensating operations into DDL operations during forward processing in order to provide correct(best effort) abort behavior

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1131 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 620fb1a..2504a3c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -339,14 +339,16 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
+        String dataverseName = null;
+        String datasetName = null;
         try {
             DatasetDecl dd = (DatasetDecl) stmt;
-            String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+            dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
                     : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
             if (dataverseName == null) {
                 throw new AlgebricksException(" dataverse not specified ");
             }
-            String datasetName = dd.getName().getValue();
+            datasetName = dd.getName().getValue();
             DatasetType dsType = dd.getDatasetType();
             String itemTypeName = dd.getItemTypeName().getValue();
 
@@ -439,6 +441,38 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+
+            //#. execute compensation operations
+            //   remove the index in NC
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+            try {
+                JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                
+                runJob(hcc, jobSpec);
+            } catch (Exception e3) {
+                if (bActiveTxn) {
+                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                }
+                //do no throw exception since still the metadata needs to be compensated. 
+            }
+
+            //   remove the record from the metadata.
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e2) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw new AlgebricksException(e2);
+            }
+
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -453,14 +487,17 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
+        String dataverseName = null;
+        String datasetName = null;
+        String indexName = null;
         try {
             CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-            String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+            dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
                     : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
             if (dataverseName == null) {
                 throw new AlgebricksException(" dataverse not specified ");
             }
-            String datasetName = stmtCreateIndex.getDatasetName().getValue();
+            datasetName = stmtCreateIndex.getDatasetName().getValue();
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
@@ -469,7 +506,7 @@
                         + dataverseName);
             }
 
-            String indexName = stmtCreateIndex.getIndexName().getValue();
+            indexName = stmtCreateIndex.getIndexName().getValue();
             Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName, indexName);
 
@@ -533,6 +570,38 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+
+            //#. execute compensation operations
+            //   remove the index in NC
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+            try {
+                JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                
+                runJob(hcc, jobSpec);
+            } catch (Exception e3) {
+                if (bActiveTxn) {
+                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                }
+                //do no throw exception since still the metadata needs to be compensated. 
+            }
+
+            //   remove the record from the metadata.
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName, indexName);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e2) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw new AlgebricksException(e2);
+            }
+            
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -592,9 +661,10 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
+        String dvName = null;
         try {
             DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-            String dvName = stmtDelete.getDataverseName().getValue();
+            dvName = stmtDelete.getDataverseName().getValue();
 
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
             if (dv == null) {
@@ -655,6 +725,24 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+
+            //#. execute compensation operations
+            //   remove the all indexes in NC
+            for (JobSpecification jobSpec : jobsToExecute) {
+                runJob(hcc, jobSpec);
+            }
+
+            //   remove the record from the metadata.
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                MetadataManager.INSTANCE.dropDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e2) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw new AlgebricksException(e2);
+            }
+
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -669,14 +757,16 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
+        String dataverseName = null;
+        String datasetName = null;
         try {
             DropStatement stmtDelete = (DropStatement) stmt;
-            String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+            dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
                     : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
             if (dataverseName == null) {
                 throw new AlgebricksException(" dataverse not specified ");
             }
-            String datasetName = stmtDelete.getDatasetName().getValue();
+            datasetName = stmtDelete.getDatasetName().getValue();
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
@@ -730,6 +820,25 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+
+            //#. execute compensation operations
+            //   remove the all indexes in NC
+            for (JobSpecification jobSpec : jobsToExecute) {
+                runJob(hcc, jobSpec);
+            }
+
+            //   remove the record from the metadata.
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e2) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw new AlgebricksException(e2);
+            }
+
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -744,10 +853,13 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
+        String dataverseName = null;
+        String datasetName = null;
+        String indexName = null;
         try {
             IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-            String datasetName = stmtIndexDrop.getDatasetName().getValue();
-            String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+            datasetName = stmtIndexDrop.getDatasetName().getValue();
+            dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
                     : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
             if (dataverseName == null) {
                 throw new AlgebricksException(" dataverse not specified ");
@@ -760,7 +872,7 @@
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-                String indexName = stmtIndexDrop.getIndexName().getValue();
+                indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
                     if (!stmtIndexDrop.getIfExists()) {
@@ -805,6 +917,25 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+
+            //#. execute compensation operations
+            //   remove the all indexes in NC
+            for (JobSpecification jobSpec : jobsToExecute) {
+                runJob(hcc, jobSpec);
+            }
+
+            //   remove the record from the metadata.
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName, indexName);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e2) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw new AlgebricksException(e2);
+            }
+
             throw new AlgebricksException(e);
 
         } finally {
@@ -972,6 +1103,7 @@
             if (bActiveTxn) {
                 MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
             }
+            
             throw new AlgebricksException(e);
         } finally {
             releaseReadLatch();