changes to 1) fix Issue 379: DDL transaction model broken and 2) cleanup and deliver all thrown exception to the caller(reviewed by Raman)
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 9900f71..bd8d2b8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -123,6 +123,11 @@
  */
 public class AqlTranslator extends AbstractAqlTranslator {
 
+    private enum ProgressState {
+        NO_PROGRESS,
+        ADDED_PENDINGOP_RECORD_TO_METADATA
+    }
+
     private final List<Statement> aqlStatements;
     private final PrintWriter out;
     private final SessionConfig sessionConfig;
@@ -311,7 +316,7 @@
     }
 
     private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, RemoteException, ACIDException {
+            throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -327,15 +332,14 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return dv;
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new MetadataException(e);
         } finally {
             releaseReadLatch();
         }
     }
 
-    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -345,14 +349,19 @@
             CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
             String dvName = stmtCreateDataverse.getDataverseName().getValue();
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-            if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-                throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+            if (dv != null) {
+                if (stmtCreateDataverse.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+                }
             }
             MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
                     stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -362,6 +371,7 @@
     private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
 
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -440,7 +450,7 @@
                     break;
                 }
             }
-            
+
             //#. initialize DatasetIdFactory if it is not initialized.
             if (!DatasetIdFactory.isInitialized()) {
                 DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
@@ -460,6 +470,7 @@
                 //#. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 //#. runJob
                 runJob(hcc, jobSpec, true);
@@ -472,18 +483,21 @@
 
             //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
             MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-                    datasetName, itemTypeName, datasetDetails, dd.getHints(), dsType, dataset.getDatasetId(),
-                    IMetadataEntity.PENDING_NO_OP));
+            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
-            if (dataset != null) {
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+
                 //#. execute compensation operations
                 //   remove the index in NC
+                //   [Notice]
+                //   As long as we updated(and committed) metadata, we should remove any effect of the job 
+                //   because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -494,11 +508,11 @@
                     bActiveTxn = false;
 
                     runJob(hcc, jobSpec, true);
-                } catch (Exception e3) {
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
                     if (bActiveTxn) {
-                        MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                        abort(e, e2, mdTxnCtx);
                     }
-                    //do no throw exception since still the metadata needs to be compensated. 
                 }
 
                 //   remove the record from the metadata.
@@ -509,8 +523,10 @@
                             datasetName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
-                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                    throw new AlgebricksException(e2);
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -523,6 +539,7 @@
     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -560,12 +577,11 @@
             aRecordType.validateKeyFields(stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getIndexType());
 
             if (idx != null) {
-                if (!stmtCreateIndex.getIfNotExists()) {
-                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-                } else {
-                    stmtCreateIndex.setNeedToCreate(false);
+                if (stmtCreateIndex.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     return;
+                } else {
+                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
                 }
             }
 
@@ -575,7 +591,7 @@
                     IMetadataEntity.PENDING_ADD_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
 
-            //#. create the index artifact in NC.
+            //#. prepare to create the index artifact in NC.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
             spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
@@ -585,7 +601,9 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
+            //#. create the index artifact in NC.
             runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -609,18 +627,16 @@
             //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
             MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                     indexName);
-            index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
-                    IMetadataEntity.PENDING_NO_OP);
+            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
-            if (spec != null) {
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                 //#. execute compensation operations
                 //   remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -633,11 +649,11 @@
                     bActiveTxn = false;
 
                     runJob(hcc, jobSpec, true);
-                } catch (Exception e3) {
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
                     if (bActiveTxn) {
-                        MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                        abort(e, e2, mdTxnCtx);
                     }
-                    //do no throw exception since still the metadata needs to be compensated. 
                 }
 
                 //   remove the record from the metadata.
@@ -648,8 +664,10 @@
                             datasetName, indexName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
-                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                    throw new AlgebricksException(e2);
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
             throw new AlgebricksException(e);
@@ -658,8 +676,7 @@
         }
     }
 
-    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, RemoteException, ACIDException, MetadataException {
+    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -695,7 +712,7 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -705,43 +722,46 @@
     private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
 
-        String dvName = null;
+        String dataverseName = null;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
             DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-            dvName = stmtDelete.getDataverseName().getValue();
+            dataverseName = stmtDelete.getDataverseName().getValue();
 
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
-                if (!stmtDelete.getIfExists()) {
-                    throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
                 }
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                return;
             }
 
             //#. prepare jobs which will drop corresponding datasets with indexes. 
-            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
                 DatasetType dsType = datasets.get(j).getDatasetType();
                 if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
 
-                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
+                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                     indexes.get(k).getIndexName());
                             jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
                         }
                     }
 
-                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 }
             }
@@ -749,12 +769,13 @@
             //#. mark PendingDropOp on the dataverse record by 
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
-            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
                     IMetadataEntity.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             for (JobSpecification jobSpec : jobsToExecute) {
                 runJob(hcc, jobSpec, true);
@@ -765,32 +786,44 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             //#. finally, delete the dataverse.
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
-            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
-            //#. execute compensation operations
-            //   remove the all indexes in NC
-            for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
-            }
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+                    activeDefaultDataverse = null;
+                }
 
-            //   remove the record from the metadata.
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            try {
-                MetadataManager.INSTANCE.dropDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (Exception e2) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                throw new AlgebricksException(e2);
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated. 
+                    e.addSuppressed(e2);
+                }
+
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                try {
+                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
+                            + ") couldn't be removed from the metadata", e);
+                }
             }
 
             throw new AlgebricksException(e);
@@ -802,6 +835,7 @@
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -821,12 +855,13 @@
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
-                if (!stmtDelete.getIfExists()) {
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
                     throw new AlgebricksException("There is no dataset with this name " + datasetName
                             + " in dataverse " + dataverseName + ".");
                 }
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                return;
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
@@ -852,6 +887,7 @@
 
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 //#. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
@@ -869,25 +905,34 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
-            //#. execute compensation operations
-            //   remove the all indexes in NC
-            for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
-            }
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated. 
+                    e.addSuppressed(e2);
+                }
 
-            //   remove the record from the metadata.
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            try {
-                MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName);
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (Exception e2) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                throw new AlgebricksException(e2);
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
+                }
             }
 
             throw new AlgebricksException(e);
@@ -899,6 +944,7 @@
     private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -927,38 +973,39 @@
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
-                    if (!stmtIndexDrop.getIfExists()) {
+                    if (stmtIndexDrop.getIfExists()) {
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                        return;
+                    } else {
                         throw new AlgebricksException("There is no index with this name " + indexName + ".");
                     }
-                } else {
-                    //#. prepare a job to drop the index in NC.
-                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                            indexName);
-                    jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-
-                    //#. mark PendingDropOp on the existing index
-                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                    MetadataManager.INSTANCE.addIndex(
-                            mdTxnCtx,
-                            new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
-                                    .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
-                    //#. commit the existing transaction before calling runJob. 
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
-                    }
-
-                    //#. begin a new transaction
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    bActiveTxn = true;
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-                    //#. finally, delete the existing index
-                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 }
+                //#. prepare a job to drop the index in NC.
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+
+                //#. mark PendingDropOp on the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
+                                index.getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+                //#. commit the existing transaction before calling runJob. 
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec, true);
+                }
+
+                //#. begin a new transaction
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+                //#. finally, delete the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
             } else {
                 throw new AlgebricksException(datasetName
                         + " is an external dataset. Indexes are not maintained for external datasets.");
@@ -967,25 +1014,34 @@
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
-            //#. execute compensation operations
-            //   remove the all indexes in NC
-            for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
-            }
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                }
 
-            //   remove the record from the metadata.
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            try {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, indexName);
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (Exception e2) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                throw new AlgebricksException(e2);
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName, indexName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                }
             }
 
             throw new AlgebricksException(e);
@@ -995,8 +1051,7 @@
         }
     }
 
-    private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+    private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1019,15 +1074,14 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
         }
     }
 
-    private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1046,15 +1100,14 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
         }
     }
 
-    private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+    private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
@@ -1077,15 +1130,14 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
         }
     }
 
-    private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, RemoteException, ACIDException, AlgebricksException {
+    private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         acquireWriteLatch();
@@ -1102,7 +1154,7 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -1149,7 +1201,7 @@
             }
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
 
             throw new AlgebricksException(e);
@@ -1181,7 +1233,7 @@
             }
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1215,7 +1267,7 @@
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1250,7 +1302,7 @@
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1315,7 +1367,7 @@
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1345,7 +1397,7 @@
 
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1407,7 +1459,7 @@
             return queryResult;
         } catch (Exception e) {
             if (bActiveTxn) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
@@ -1415,8 +1467,7 @@
         }
     }
 
-    private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws MetadataException, AlgebricksException, RemoteException, ACIDException {
+    private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1439,7 +1490,7 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            abort(e, e, mdTxnCtx);
             throw new AlgebricksException(e);
         } finally {
             releaseWriteLatch();
@@ -1494,4 +1545,13 @@
     private void releaseReadLatch() {
         MetadataManager.INSTANCE.releaseReadLatch();
     }
+
+    private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
+        try {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+        } catch (Exception e2) {
+            parentE.addSuppressed(e2);
+            throw new IllegalStateException(rootE);
+        }
+    }
 }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
index ffd0534..7a764ae 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateIndexStatement.java
@@ -12,7 +12,6 @@
 public class CreateIndexStatement implements Statement {
 
     private Identifier indexName;
-    private boolean needToCreate = true;
     private Identifier dataverseName;
     private Identifier datasetName;
     private List<String> fieldExprs = new ArrayList<String>();
@@ -33,14 +32,6 @@
         return gramLength;
     }
 
-    public void setNeedToCreate(boolean needToCreate) {
-        this.needToCreate = needToCreate;
-    }
-
-    public boolean getNeedToCreate() {
-        return needToCreate;
-    }
-
     public Identifier getIndexName() {
         return indexName;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index 91a18f3..070e6ea 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -40,7 +40,7 @@
     private final Map<String, String> hints;
     private final int datasetId;
     // Type of pending operations with respect to atomic DDL operation
-    private final int pendingOp;
+    private int pendingOp;
 
     public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
             Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp) {
@@ -86,6 +86,10 @@
         return pendingOp;
     }
 
+    public void setPendingOp(int pendingOp) {
+        this.pendingOp = pendingOp;
+    }
+
     @Override
     public Object addToCache(MetadataCache cache) {
         return cache.addDatasetIfNotExists(this);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
index 6d65730..aa29e5b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.metadata.entities;
 
-import java.io.Serializable;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
@@ -46,7 +45,7 @@
     // Specific to NGRAM indexes.
     private final int gramLength;
     // Type of pending operations with respect to atomic DDL operation
-    private final int pendingOp;
+    private int pendingOp;
 
     public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
             List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
@@ -103,6 +102,10 @@
     public int getPendingOp() {
         return pendingOp;
     }
+    
+    public void setPendingOp(int pendingOp) {
+        this.pendingOp = pendingOp;
+    }
 
     public boolean isSecondaryIndex() {
         return !isPrimaryIndex();