implemented Atomic DDL operations and passed existing tests.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1070 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index a2b2163..620fb1a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -68,6 +68,7 @@
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -143,8 +144,7 @@
 
         for (Statement stmt : aqlStatements) {
             validateOperation(activeDefaultDataverse, stmt);
-            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
             metadataProvider.setWriterFactory(writerFactory);
             metadataProvider.setOutputFile(outputFile);
             metadataProvider.setConfig(config);
@@ -253,15 +253,9 @@
                     }
 
                 }
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             } catch (Exception e) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
                 throw new AlgebricksException(e);
             }
-            // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
-            for (JobSpecification jobspec : jobsToExecute) {
-                runJob(hcc, jobspec);
-            }
         }
         return executionResult;
     }
@@ -289,398 +283,802 @@
 
     private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
-        DataverseDecl dvd = (DataverseDecl) stmt;
-        String dvName = dvd.getDataverseName().getValue();
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-        if (dv == null) {
-            throw new MetadataException("Unknown dataverse " + dvName);
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            DataverseDecl dvd = (DataverseDecl) stmt;
+            String dvName = dvd.getDataverseName().getValue();
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv == null) {
+                throw new MetadataException("Unknown dataverse " + dvName);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return dv;
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new MetadataException(e);
+        } finally {
+            releaseReadLatch();
         }
-        return dv;
     }
 
     private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
             ACIDException {
-        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-        String dvName = stmtCreateDataverse.getDataverseName().getValue();
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-        if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-            throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+            String dvName = stmtCreateDataverse.getDataverseName().getValue();
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+                throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+            }
+            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
-        MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-                stmtCreateDataverse.getFormat()));
     }
 
     private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
-        DatasetDecl dd = (DatasetDecl) stmt;
-        String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
-                : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        String datasetName = dd.getName().getValue();
-        DatasetType dsType = dd.getDatasetType();
-        String itemTypeName = dd.getItemTypeName().getValue();
 
-        IDatasetDetails datasetDetails = null;
-        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                datasetName);
-        if (ds != null) {
-            if (dd.getIfNotExists()) {
-                return;
-            } else {
-                throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            DatasetDecl dd = (DatasetDecl) stmt;
+            String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+                    : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
             }
-        }
-        Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-                itemTypeName);
-        if (dt == null) {
-            throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
-        }
-        switch (dd.getDatasetType()) {
-            case INTERNAL: {
-                IAType itemType = dt.getDatatype();
-                if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                    throw new AlgebricksException("Can only partition ARecord's.");
+            String datasetName = dd.getName().getValue();
+            DatasetType dsType = dd.getDatasetType();
+            String itemTypeName = dd.getItemTypeName().getValue();
+
+            IDatasetDetails datasetDetails = null;
+            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds != null) {
+                if (dd.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
                 }
-                List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                        .getPartitioningExprs();
-                String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-                datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
-                break;
             }
-            case EXTERNAL: {
-                String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-                datasetDetails = new ExternalDatasetDetails(adapter, properties);
-                break;
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    itemTypeName);
+            if (dt == null) {
+                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
             }
-            case FEED: {
-                IAType itemType = dt.getDatatype();
-                if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                    throw new AlgebricksException("Can only partition ARecord's.");
+            switch (dd.getDatasetType()) {
+                case INTERNAL: {
+                    IAType itemType = dt.getDatatype();
+                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                        throw new AlgebricksException("Can only partition ARecord's.");
+                    }
+                    List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getPartitioningExprs();
+                    String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+                            ngName);
+                    break;
                 }
-                List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
-                String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-                String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
-                Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
-                FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
-                datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
-                        adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
-                break;
+                case EXTERNAL: {
+                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+                    datasetDetails = new ExternalDatasetDetails(adapter, properties);
+                    break;
+                }
+                case FEED: {
+                    IAType itemType = dt.getDatatype();
+                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                        throw new AlgebricksException("Can only partition ARecord's.");
+                    }
+                    List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getPartitioningExprs();
+                    String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+                    String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+                    Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getConfiguration();
+                    FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+                    datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+                            ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+                    break;
+                }
             }
-        }
-        MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-                datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
-        if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                    dataverseName);
-            runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+
+            //#. add a new dataset with PendingAddOp
+            Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
+                    DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+
+            if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                        dataverseName);
+                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+                        metadataProvider);
+
+                //#. make metadataTxn commit before calling runJob.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+
+                //#. runJob
+                runJob(hcc, jobSpec);
+
+                //#. begin new metadataTxn
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            }
+
+            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+                    datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
+                    IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        String datasetName = stmtCreateIndex.getDatasetName().getValue();
-        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                datasetName);
-        if (ds == null) {
-            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                    + dataverseName);
-        }
-        String indexName = stmtCreateIndex.getIndexName().getValue();
-        Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                datasetName, indexName);
-        if (idx != null) {
-            if (!stmtCreateIndex.getIfNotExists()) {
-                throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-            } else {
-                stmtCreateIndex.setNeedToCreate(false);
-            }
-        } else {
-            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
-            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
 
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+            String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
+            }
+            String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds == null) {
+                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                        + dataverseName);
+            }
+
+            String indexName = stmtCreateIndex.getIndexName().getValue();
+            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName, indexName);
+
+            if (idx != null) {
+                if (!stmtCreateIndex.getIfNotExists()) {
+                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+                } else {
+                    stmtCreateIndex.setNeedToCreate(false);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                }
+            }
+
+            //#. add a new index with PendingAddOp
+            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+                    IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+
+            //#. create the index artifact in NC.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-            JobSpecification loadIndexJobSpec = IndexOperations
-                    .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-            runJob(hcc, loadIndexJobSpec);
+            JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
+            if (spec == null) {
+                throw new AsterixException("Failed to create job spec for creating index '"
+                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            runJob(hcc, spec);
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. load data into the index in NC.
+            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+                    index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            runJob(hcc, spec);
+
+            //#. begin new metadataTxn
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                    indexName);
+            index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+                    IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
             MetadataException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        TypeDecl stmtCreateType = (TypeDecl) stmt;
-        String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        String typeName = stmtCreateType.getIdent().getValue();
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-        if (dv == null) {
-            throw new AlgebricksException("Unknonw dataverse " + dataverseName);
-        }
-        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-        if (dt != null) {
-            if (!stmtCreateType.getIfNotExists())
-                throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-        } else {
-            if (builtinTypeMap.get(typeName) != null) {
-                throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-            } else {
-                Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
-                        dataverseName);
-                TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-                IAType type = typeMap.get(typeSignature);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            TypeDecl stmtCreateType = (TypeDecl) stmt;
+            String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
             }
+            String typeName = stmtCreateType.getIdent().getValue();
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+            }
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            if (dt != null) {
+                if (!stmtCreateType.getIfNotExists()) {
+                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+                }
+            } else {
+                if (builtinTypeMap.get(typeName) != null) {
+                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+                } else {
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+                            dataverseName);
+                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+                    IAType type = typeMap.get(typeSignature);
+                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+                }
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-        String dvName = stmtDelete.getDataverseName().getValue();
 
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
-        if (dv == null) {
-            if (!stmtDelete.getIfExists()) {
-                throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+            String dvName = stmtDelete.getDataverseName().getValue();
+
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+            if (dv == null) {
+                if (!stmtDelete.getIfExists()) {
+                    throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+                }
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return;
             }
-        } else {
+
+            //#. prepare jobs which will drop corresponding datasets with indexes. 
             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
                 DatasetType dsType = datasets.get(j).getDatasetType();
                 if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
+
                     List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
-                                    metadataProvider);
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
                         }
                     }
+
+                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 }
-                compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
             }
 
+            //#. mark PendingDropOp on the dataverse record by 
+            //   first, deleting the dataverse record from the DATAVERSE_DATASET
+            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+                    IMetadataEntity.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            for (JobSpecification jobSpec : jobsToExecute) {
+                runJob(hcc, jobSpec);
+            }
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
                 activeDefaultDataverse = null;
             }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        DropStatement stmtDelete = (DropStatement) stmt;
-        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        String datasetName = stmtDelete.getDatasetName().getValue();
-        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-        if (ds == null) {
-            if (!stmtDelete.getIfExists())
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName + ".");
-        } else {
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            DropStatement stmtDelete = (DropStatement) stmt;
+            String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
+            }
+            String datasetName = stmtDelete.getDatasetName().getValue();
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (ds == null) {
+                if (!stmtDelete.getIfExists()) {
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName
+                            + " in dataverse " + dataverseName + ".");
+                }
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return;
+            }
+
             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+
+                //#. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
-                    if (indexes.get(j).isPrimaryIndex()) {
-                        compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
-                                metadataProvider);
+                    if (indexes.get(j).isSecondaryIndex()) {
+                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                indexes.get(j).getIndexName());
+                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
                     }
                 }
+                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+
+                //#. mark the existing dataset as PendingDropOp
+                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+                MetadataManager.INSTANCE.addDataset(
+                        mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
+                                .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+
+                //#. run the jobs
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec);
+                }
+
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
-            compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
+
+            //#. finally, delete the dataset.
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
-        String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-        if (ds == null)
-            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                    + dataverseName);
-        if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-            String indexName = stmtIndexDrop.getIndexName().getValue();
-            Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-            if (idx == null) {
-                if (!stmtIndexDrop.getIfExists())
-                    throw new AlgebricksException("There is no index with this name " + indexName + ".");
-            } else
-                compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
-        } else {
-            throw new AlgebricksException(datasetName
-                    + " is an external dataset. Indexes are not maintained for external datasets.");
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+            String datasetName = stmtIndexDrop.getDatasetName().getValue();
+            String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
+            }
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (ds == null) {
+                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                        + dataverseName);
+            }
+
+            if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+                String indexName = stmtIndexDrop.getIndexName().getValue();
+                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                if (index == null) {
+                    if (!stmtIndexDrop.getIfExists()) {
+                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
+                    }
+                } else {
+                    //#. prepare a job to drop the index in NC.
+                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                            indexName);
+                    jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+
+                    //#. mark PendingDropOp on the existing index
+                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                    MetadataManager.INSTANCE.addIndex(
+                            mdTxnCtx,
+                            new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
+                                    .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+                    //#. commit the existing transaction before calling runJob. 
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec);
+                    }
+
+                    //#. begin a new transaction
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    bActiveTxn = true;
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+                    //#. finally, delete the existing index
+                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                }
+            } else {
+                throw new AlgebricksException(datasetName
+                        + " is an external dataset. Indexes are not maintained for external datasets.");
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
             ACIDException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-        String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
-        if (dataverseName == null) {
-            throw new AlgebricksException(" dataverse not specified ");
-        }
-        String typeName = stmtTypeDrop.getTypeName().getValue();
-        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-        if (dt == null) {
-            if (!stmtTypeDrop.getIfExists())
-                throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
-        } else {
-            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+            String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+            if (dataverseName == null) {
+                throw new AlgebricksException(" dataverse not specified ");
+            }
+            String typeName = stmtTypeDrop.getTypeName().getValue();
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            if (dt == null) {
+                if (!stmtTypeDrop.getIfExists())
+                    throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+            } else {
+                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
             ACIDException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-        String nodegroupName = stmtDelete.getNodeGroupName().getValue();
-        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
-        if (ng == null) {
-            if (!stmtDelete.getIfExists())
-                throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
-        } else {
-            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+            String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+            NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+            if (ng == null) {
+                if (!stmtDelete.getIfExists())
+                    throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+            } else {
+                MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
             ACIDException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-        String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
-        if (dataverse == null) {
-            throw new AlgebricksException(" dataverse not specified ");
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+            String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+            if (dataverse == null) {
+                throw new AlgebricksException(" dataverse not specified ");
+            }
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+            if (dv == null) {
+                throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+            }
+            Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+                    .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+                    Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+            MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-        if (dv == null) {
-            throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
-        }
-        Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-                .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-                Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
-        MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
     }
 
     private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
             AlgebricksException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-        FunctionSignature signature = stmtDropFunction.getFunctionSignature();
-        Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-        if (function == null) {
-            if (!stmtDropFunction.getIfExists())
-                throw new AlgebricksException("Unknonw function " + signature);
-        } else {
-            MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+            FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+            Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+            if (function == null) {
+                if (!stmtDropFunction.getIfExists())
+                    throw new AlgebricksException("Unknonw function " + signature);
+            } else {
+                MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
     private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
-        String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
-        CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
-                .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
 
-        IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
-        Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-        jobsToExecute.add(job.getJobSpec());
-        // Also load the dataset's secondary indexes.
-        List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
-                .getDatasetName().getValue());
-        for (Index index : datasetIndexes) {
-            if (!index.isSecondaryIndex()) {
-                continue;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+            String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
+                    .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+                    loadStmt.dataIsAlreadySorted());
+
+            IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+            Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+            jobsToExecute.add(job.getJobSpec());
+            // Also load the dataset's secondary indexes.
+            List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+                    .getDatasetName().getValue());
+            for (Index index : datasetIndexes) {
+                if (!index.isSecondaryIndex()) {
+                    continue;
+                }
+                // Create CompiledCreateIndexStatement from metadata entity 'index'.
+                CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
+                        dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
+                        index.getIndexType());
+                jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
             }
-            // Create CompiledCreateIndexStatement from metadata entity 'index'.
-            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-                    index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-            jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            for (JobSpecification jobspec : jobsToExecute) {
+                runJob(hcc, jobspec);
+            }
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
     }
 
     private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        metadataProvider.setWriteTransaction(true);
-        WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
-        String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
-        CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
-                .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
 
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            metadataProvider.setWriteTransaction(true);
+            WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+            String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+            CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+                    .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+
+            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+                    clfrqs);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            if (compiled.first != null) {
+                runJob(hcc, compiled.first);
+            }
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
     }
 
     private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        metadataProvider.setWriteTransaction(true);
-        InsertStatement stmtInsert = (InsertStatement) stmt;
-        String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
-        CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-                .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            metadataProvider.setWriteTransaction(true);
+            InsertStatement stmtInsert = (InsertStatement) stmt;
+            String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+                    .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+                    clfrqs);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            if (compiled.first != null) {
+                runJob(hcc, compiled.first);
+            }
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
     }
 
     private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        metadataProvider.setWriteTransaction(true);
-        DeleteStatement stmtDelete = (DeleteStatement) stmt;
-        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-        CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
-                stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
-                stmtDelete.getVarCounter(), metadataProvider);
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            metadataProvider.setWriteTransaction(true);
+            DeleteStatement stmtDelete = (DeleteStatement) stmt;
+            String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+            CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+                    stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+                    stmtDelete.getVarCounter(), metadataProvider);
+            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+                    clfrqs);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            if (compiled.first != null) {
+                runJob(hcc, compiled.first);
+            }
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
     }
 
@@ -704,84 +1102,140 @@
 
     private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        BeginFeedStatement bfs = (BeginFeedStatement) stmt;
-        String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
 
-        CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
-                bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
 
-        Dataset dataset;
-        dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-                .getDatasetName().getValue());
-        IDatasetDetails datasetDetails = dataset.getDatasetDetails();
-        if (datasetDetails.getDatasetType() != DatasetType.FEED) {
-            throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
-        }
-        bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
-        cbfs.setQuery(bfs.getQuery());
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-        if (compiled.first != null) {
-            jobsToExecute.add(compiled.first);
+        try {
+            BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+            String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+
+            CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
+                    .getValue(), bfs.getQuery(), bfs.getVarCounter());
+
+            Dataset dataset;
+            dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+                    .getDatasetName().getValue());
+            IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+            if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+                throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
+                        + " is not a feed dataset");
+            }
+            bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+            cbfs.setQuery(bfs.getQuery());
+            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            if (compiled.first != null) {
+                runJob(hcc, compiled.first);
+            }
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
     }
 
     private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-        ControlFeedStatement cfs = (ControlFeedStatement) stmt;
-        String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
-        CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
-                cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-        jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+            String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+                    : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+            CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
+                    dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+            JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            runJob(hcc, jobSpec);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
+        }
     }
 
     private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
             List<JobSpecification> jobsToExecute) throws Exception {
-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-        if (compiled.first != null) {
-            GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-            jobsToExecute.add(compiled.first);
-        }
-        return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
-    }
 
-    private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
-            AqlMetadataProvider metadataProvider) throws Exception {
-        // TODO: Eventually CreateIndexStatement and
-        // CompiledCreateIndexStatement should be replaced by the corresponding
-        // metadata entity.
-        // For now we must still convert to a CompiledCreateIndexStatement here.
-        String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-                : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-        CompiledCreateIndexStatement createIndexStmt = new CompiledCreateIndexStatement(stmtCreateIndex.getIndexName()
-                .getValue(), dataverseName, stmtCreateIndex.getDatasetName().getValue(),
-                stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), stmtCreateIndex.getIndexType());
-        JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(createIndexStmt, metadataProvider);
-        if (spec == null) {
-            throw new AsterixException("Failed to create job spec for creating index '"
-                    + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireReadLatch();
+
+        try {
+            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+
+            QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            if (compiled.first != null) {
+                GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+                runJob(hcc, compiled.first);
+            }
+
+            return queryResult;
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+            throw new AlgebricksException(e);
+        } finally {
+            releaseReadLatch();
         }
-        runJob(hcc, spec);
     }
 
     private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
             ACIDException {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-        String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
-        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
-        if (ng != null) {
-            if (!stmtCreateNodegroup.getIfNotExists())
-                throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
-        } else {
-            List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
-            List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
-            for (Identifier id : ncIdentifiers) {
-                ncNames.add(id.getValue());
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        acquireWriteLatch();
+
+        try {
+            NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+            String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+            NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+            if (ng != null) {
+                if (!stmtCreateNodegroup.getIfNotExists())
+                    throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+            } else {
+                List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+                List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+                for (Identifier id : ncIdentifiers) {
+                    ncNames.add(id.getValue());
+                }
+                MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
             }
-            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AlgebricksException(e);
+        } finally {
+            releaseWriteLatch();
         }
     }
 
@@ -789,27 +1243,6 @@
         executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
     }
 
-    private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
-            String indexName, AqlMetadataProvider metadataProvider) throws Exception {
-        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-        runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                indexName);
-    }
-
-    private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
-            AqlMetadataProvider metadataProvider) throws Exception {
-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-        if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-            JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-            for (JobSpecification spec : jobSpecs)
-                runJob(hcc, spec);
-        }
-        MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-    }
-
     public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
             throws Exception {
         for (int i = 0; i < jobs.length; i++) {
@@ -831,4 +1264,20 @@
         }
         return format;
     }
+
+    private void acquireWriteLatch() {
+        MetadataManager.INSTANCE.acquireWriteLatch();
+    }
+
+    private void releaseWriteLatch() {
+        MetadataManager.INSTANCE.releaseWriteLatch();
+    }
+
+    private void acquireReadLatch() {
+        MetadataManager.INSTANCE.acquireReadLatch();
+    }
+
+    private void releaseReadLatch() {
+        MetadataManager.INSTANCE.releaseReadLatch();
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index b32f129..9d20281 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -90,7 +90,7 @@
 
     private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
 
-    public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+    public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
             AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
             ACIDException, AsterixException {
 
@@ -111,67 +111,10 @@
             throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
         }
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return new JobSpecification[0];
+            return new JobSpecification();
         }
-
-        List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
-                dataset.getDatasetName());
-        int numSecondaryIndexes = 0;
-        for (Index index : datasetIndexes) {
-            if (index.isSecondaryIndex()) {
-                numSecondaryIndexes++;
-            }
-        }
-        JobSpecification[] specs;
-        if (numSecondaryIndexes > 0) {
-            specs = new JobSpecification[numSecondaryIndexes + 1];
-            int i = 0;
-            // First, drop secondary indexes.
-            for (Index index : datasetIndexes) {
-                if (index.isSecondaryIndex()) {
-                    specs[i] = new JobSpecification();
-                    Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
-                            .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
-                                    datasetName, index.getIndexName());
-                    IIndexDataflowHelperFactory dfhFactory;
-                    switch (index.getIndexType()) {
-                        case BTREE:
-                            dfhFactory = new LSMBTreeDataflowHelperFactory(
-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
-                            break;
-                        case RTREE:
-                            dfhFactory = new LSMRTreeDataflowHelperFactory(
-                                    new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
-                                    new IBinaryComparatorFactory[] { null },
-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
-                            break;
-                        case NGRAM_INVIX:
-                        case WORD_INVIX:
-                            dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
-                            break;
-                        default:
-                            throw new AsterixException("Unknown index type provided.");
-                    }
-                    IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
-                            AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-                            AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
-                    AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
-                            idxSplitsAndConstraint.second);
-                    i++;
-                }
-            }
-        } else {
-            specs = new JobSpecification[1];
-        }
+        
         JobSpecification specPrimary = new JobSpecification();
-        specs[specs.length - 1] = specPrimary;
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
@@ -187,7 +130,7 @@
 
         specPrimary.addRoot(primaryBtreeDrop);
 
-        return specs;
+        return specPrimary;
     }
 
     public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 111f76b..8bb6499 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -95,11 +95,11 @@
             File testFile = tcCtx.getTestFile(cUnit);
 
             /*************** to avoid run failure cases ****************
-            if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
+            if (!testFile.getAbsolutePath().contains("index-selection/")) {
                 continue;
             }
             ************************************************************/
-
+            
             File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
             File actualFile = new File(PATH_ACTUAL + File.separator
                     + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 1fdde73..3989a0f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -17,6 +17,8 @@
 
 import java.rmi.RemoteException;
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
@@ -79,10 +81,10 @@
 public class MetadataManager implements IMetadataManager {
     // Set in init().
     public static MetadataManager INSTANCE;
-
     private final MetadataCache cache = new MetadataCache();
     private IAsterixStateProxy proxy;
     private IMetadataNode metadataNode;
+    private final ReadWriteLock metadataLatch;
 
     public MetadataManager(IAsterixStateProxy proxy) {
         if (proxy == null) {
@@ -90,6 +92,7 @@
         }
         this.proxy = proxy;
         this.metadataNode = null;
+        this.metadataLatch = new ReentrantReadWriteLock(true);
     }
 
     @Override
@@ -206,11 +209,14 @@
 
     @Override
     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
+        // add dataset into metadataNode 
         try {
             metadataNode.addDataset(ctx.getJobId(), dataset);
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
+
+        // reflect the dataset into the cache
         ctx.addDataset(dataset);
     }
 
@@ -375,7 +381,7 @@
     @Override
     public Index getIndex(MetadataTransactionContext ctx, String dataverseName, String datasetName, String indexName)
             throws MetadataException {
-        
+
         // First look in the context to see if this transaction created the
         // requested index itself (but the index is still uncommitted).
         Index index = ctx.getIndex(dataverseName, datasetName, indexName);
@@ -384,7 +390,7 @@
             // uncommitted.
             return index;
         }
-        
+
         if (ctx.indexIsDropped(dataverseName, datasetName, indexName)) {
             // Index has been dropped by this transaction but could still be
             // in the cache.
@@ -585,4 +591,25 @@
         }
         return adapter;
     }
+
+    @Override
+    public void acquireWriteLatch() {
+        metadataLatch.writeLock().lock();
+    }
+
+    @Override
+    public void releaseWriteLatch() {
+        metadataLatch.writeLock().unlock();
+    }
+
+    @Override
+    public void acquireReadLatch() {
+        metadataLatch.readLock().lock();
+    }
+
+    @Override
+    public void releaseReadLatch() {
+        metadataLatch.readLock().unlock();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index a9406e2..86a8c6e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -56,14 +56,18 @@
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -75,10 +79,13 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
@@ -88,7 +95,7 @@
     private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
 
     private IIndexLifecycleManager indexLifecycleManager;
-    private TransactionSubsystem transactionProvider;
+    private TransactionSubsystem transactionSubsystem;
 
     public static final MetadataNode INSTANCE = new MetadataNode();
 
@@ -97,26 +104,26 @@
     }
 
     public void initialize(AsterixAppRuntimeContext runtimeContext) {
-        this.transactionProvider = runtimeContext.getTransactionSubsystem();
+        this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
         this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
     }
 
     @Override
     public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException {
-        transactionProvider.getTransactionManager().beginTransaction(transactionId);
+        transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
     }
 
     @Override
     public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        transactionProvider.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
     }
 
     @Override
     public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
         try {
-            TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-            transactionProvider.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
+            TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
         } catch (ACIDException e) {
             e.printStackTrace();
             throw e;
@@ -125,14 +132,14 @@
 
     @Override
     public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        transactionProvider.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
     @Override
     public void unlock(JobId jobId) throws ACIDException, RemoteException {
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        transactionProvider.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
     }
 
     @Override
@@ -160,7 +167,7 @@
                 // Add the primary index for the dataset.
                 InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
                 Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
-                        dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
+                        dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
                 addIndex(jobId, primaryIndex);
                 ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
                         dataset.getDatasetName());
@@ -253,22 +260,64 @@
         insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple);
     }
 
-    private void insertTupleIntoIndex(JobId jobId, IMetadataIndex index, ITupleReference tuple) throws Exception {
-        long resourceID = index.getResourceID();
-        IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+    private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
+            throws Exception {
+        long resourceID = metadataIndex.getResourceID();
+        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
         indexLifecycleManager.open(resourceID);
-        IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+
+        //prepare a Callback for logging
+        IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
+                lsmIndex, IndexOperation.INSERT);
+
+        IIndexAccessor indexAccessor = null;
+        indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+
         // TODO: fix exceptions once new BTree exception model is in hyracks.
         indexAccessor.insert(tuple);
-        //TODO: extract the key from the tuple and get the PKHashValue from the key.
-        //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
-        //        resourceID, IndexOperation.INSERT, tuple, null, null);
+
         indexLifecycleManager.close(resourceID);
     }
 
+    private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
+            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        TransactionalResourceRepository txnResourceRepository = transactionSubsystem
+                .getTransactionalResourceRepository();
+
+        if (txnResourceRepository.getTransactionalResource(resourceId) == null) {
+            transactionSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId,
+                    lsmIndex);
+        }
+
+        int[] primaryKeyFields = metadataIndex.getPrimaryKeyIndexes();
+        int numKeys = primaryKeyFields.length;
+        IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories = null;
+        if (metadataIndex.isPrimaryIndex()) {
+            primaryKeyHashFunctionFactories = metadataIndex.getKeyBinaryHashFunctionFactory();
+        } else {
+            primaryKeyHashFunctionFactories = new IBinaryHashFunctionFactory[numKeys];
+            IBinaryHashFunctionFactory[] secondaryKeyHashFunctionFactories = metadataIndex
+                    .getKeyBinaryHashFunctionFactory();
+            for (int i = 0; i < numKeys; i++) {
+                primaryKeyHashFunctionFactories[i] = secondaryKeyHashFunctionFactories[primaryKeyFields[i]];
+            }
+        }
+
+        IModificationOperationCallback callback = null;
+        if (metadataIndex.isPrimaryIndex()) {
+            callback = new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+                    primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, transactionSubsystem.getLockManager(),
+                    transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+        } else {
+            callback = new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+                    primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, transactionSubsystem.getLockManager(),
+                    transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+        }
+
+        return callback;
+    }
+
     @Override
     public void dropDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
         try {
@@ -524,24 +573,16 @@
         }
     }
 
-    private void deleteTupleFromIndex(JobId jobId, IMetadataIndex index, ITupleReference tuple) throws Exception {
-        long resourceID = index.getResourceID();
-        IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+    private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
+            throws Exception {
+        long resourceID = metadataIndex.getResourceID();
+        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
         indexLifecycleManager.open(resourceID);
-        IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        // This lock is actually an upgrade, because a deletion must be preceded
-        // by a search, in order to be able to undo an aborted deletion.
-        // The transaction with txnId will have an S lock on the
-        // resource. Note that lock converters have a higher priority than
-        // regular waiters in the LockManager.
-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+        //prepare a Callback for logging
+        IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
+                lsmIndex, IndexOperation.DELETE);
+        IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
         indexAccessor.delete(tuple);
-        //TODO: extract the key from the tuple and get the PKHashValue from the key.
-        //check how to get the oldValue.
-        //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
-        //        resourceID, IndexOperation.DELETE, tuple, operation, null);
         indexLifecycleManager.close(resourceID);
     }
 
@@ -802,8 +843,10 @@
 
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
-        TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        //#. currently lock is not needed to access any metadata 
+        //   since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
+        //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
         long resourceID = index.getResourceID();
         IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index 8052f75..5ec1d5a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -19,6 +19,7 @@
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -104,19 +105,19 @@
     }
 
     public void dropDataset(String dataverseName, String datasetName) {
-        Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
+        Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
         droppedCache.addDatasetIfNotExists(dataset);
         logAndApply(new MetadataLogicalOperation(dataset, false));
     }
 
     public void dropIndex(String dataverseName, String datasetName, String indexName) {
-        Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
+        Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
         droppedCache.addIndexIfNotExists(index);
         logAndApply(new MetadataLogicalOperation(index, false));
     }
 
     public void dropDataverse(String dataverseName) {
-        Dataverse dataverse = new Dataverse(dataverseName, null);
+        Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
         droppedCache.addDataverseIfNotExists(dataverse);
         logAndApply(new MetadataLogicalOperation(dataverse, false));
     }
@@ -162,7 +163,7 @@
         }
         return droppedCache.getDataset(dataverseName, datasetName) != null;
     }
-    
+
     public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
         if (droppedCache.getDataverse(dataverseName) != null) {
             return true;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
index 847ec67..43f8dc4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
@@ -20,6 +20,11 @@
 import edu.uci.ics.asterix.metadata.MetadataCache;
 
 public interface IMetadataEntity extends Serializable {
+    
+    public static final int PENDING_NO_OP = 0;
+    public static final int PENDING_ADD_OP = 1;
+    public static final int PENDING_DROP_OP = 2;
+    
     Object addToCache(MetadataCache cache);
 
     Object dropFromCache(MetadataCache cache);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index 8554ddd..d9021a5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -75,6 +75,11 @@
     public long getResourceID();
 
     public IndexLogger getIndexLogger();
-    
+
     public DatasetId getDatasetId();
+
+    boolean isPrimaryIndex();
+
+    int[] getPrimaryKeyIndexes();
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index fe96a6f..ad08f18 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -433,4 +433,12 @@
 
 	
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
+    
+    public void acquireWriteLatch();
+
+    public void releaseWriteLatch();
+
+    public void acquireReadLatch();
+
+    public void releaseReadLatch();
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 838de70..0570c82 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
 import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -226,7 +227,7 @@
     public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
         String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
         String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
-        MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
+        MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
     }
 
     public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
@@ -236,7 +237,7 @@
                     primaryIndexes[i].getNodeGroupName());
             MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
                     primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
-                    id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
+                    id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
         }
     }
 
@@ -267,7 +268,7 @@
         for (int i = 0; i < secondaryIndexes.length; i++) {
             MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
                     secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
-                    secondaryIndexes[i].getPartitioningExpr(), false));
+                    secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index c069d0d..de0061d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -73,9 +73,14 @@
     private IndexLogger indexLogger;
     // datasetId
     private final DatasetId datasetId;
+    // Flag of primary index
+    protected final boolean isPrimaryIndex;
+    // PrimaryKeyField indexes used for secondary index operations
+    protected final int[] primaryKeyIndexes;
 
     public MetadataIndex(String datasetName, String indexName, int numFields, IAType[] keyTypes, String[] keyNames,
-            ARecordType payloadType, int datasetId) throws AsterixRuntimeException {
+            ARecordType payloadType, int datasetId, boolean isPrimaryIndex, int[] primaryKeyIndexes)
+            throws AsterixRuntimeException {
         // Sanity checks.
         if (keyTypes.length != keyNames.length) {
             throw new AsterixRuntimeException("Unequal number of key types and names given.");
@@ -130,6 +135,10 @@
         }
 
         this.datasetId = new DatasetId(datasetId);
+        this.isPrimaryIndex = isPrimaryIndex;
+
+        //PrimaryKeyFieldIndexes
+        this.primaryKeyIndexes = primaryKeyIndexes;
     }
 
     @Override
@@ -249,4 +258,14 @@
     public DatasetId getDatasetId() {
         return datasetId;
     }
+    
+    @Override
+    public boolean isPrimaryIndex() {
+        return isPrimaryIndex;
+    }
+    
+    @Override
+    public int[] getPrimaryKeyIndexes() {
+        return primaryKeyIndexes;
+    }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 4061fed..483881c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -61,32 +61,37 @@
         }
 
         DATAVERSE_DATASET = new MetadataIndex("Dataverse", null, 2, new IAType[] { BuiltinType.ASTRING },
-                new String[] { "DataverseName" }, MetadataRecordTypes.DATAVERSE_RECORDTYPE, DATAVERSE_DATASET_ID);
+                new String[] { "DataverseName" }, MetadataRecordTypes.DATAVERSE_RECORDTYPE, DATAVERSE_DATASET_ID, true,
+                new int[] { 0 });
 
         DATASET_DATASET = new MetadataIndex("Dataset", null, 3,
                 new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
-                        "DatasetName" }, MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID);
+                        "DatasetName" }, MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID, true, new int[] {
+                        0, 1 });
 
         DATATYPE_DATASET = new MetadataIndex("Datatype", null, 3, new IAType[] { BuiltinType.ASTRING,
                 BuiltinType.ASTRING }, new String[] { "DataverseName", "DatatypeName" },
-                MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID);
+                MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID, true, new int[] { 0, 1 });
 
         INDEX_DATASET = new MetadataIndex("Index", null, 4, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
                 BuiltinType.ASTRING }, new String[] { "DataverseName", "DatasetName", "IndexName" },
-                MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID);
+                MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID, true, new int[] { 0, 1, 2 });
 
         NODE_DATASET = new MetadataIndex("Node", null, 2, new IAType[] { BuiltinType.ASTRING },
-                new String[] { "NodeName" }, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID);
+                new String[] { "NodeName" }, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID, true,
+                new int[] { 0 });
 
         NODEGROUP_DATASET = new MetadataIndex("Nodegroup", null, 2, new IAType[] { BuiltinType.ASTRING },
-                new String[] { "GroupName" }, MetadataRecordTypes.NODEGROUP_RECORDTYPE, NODEGROUP_DATASET_ID);
+                new String[] { "GroupName" }, MetadataRecordTypes.NODEGROUP_RECORDTYPE, NODEGROUP_DATASET_ID, true,
+                new int[] { 0 });
 
         FUNCTION_DATASET = new MetadataIndex("Function", null, 4, new IAType[] { BuiltinType.ASTRING,
                 BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name", "Arity" },
-                MetadataRecordTypes.FUNCTION_RECORDTYPE, FUNCTION_DATASET_ID);
+                MetadataRecordTypes.FUNCTION_RECORDTYPE, FUNCTION_DATASET_ID, true, new int[] { 0, 1, 2 });
 
         DATASOURCE_ADAPTER_DATASET = new MetadataIndex("DatasourceAdapter", null, 3, new IAType[] {
                 BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" },
-                MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID);
+                MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
+                        1 });
     }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 50ea20a..17924b7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -80,10 +80,11 @@
     public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
     public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
     public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
+    public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
 
     private static final ARecordType createDataverseRecordType() {
-        return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
+        return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
     }
 
     // Helper constants for accessing fields in an ARecord of anonymous type
@@ -158,10 +159,11 @@
     public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
     public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
     public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
+    public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
 
     private static final ARecordType createDatasetRecordType() {
         String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
-                "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
+                "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
 
         List<IAType> internalRecordUnionList = new ArrayList<IAType>();
         internalRecordUnionList.add(BuiltinType.ANULL);
@@ -179,7 +181,8 @@
         AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
 
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
+                internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
+                BuiltinType.AINT32 };
         return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
     }
 
@@ -264,13 +267,14 @@
     public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
     public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
     public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
+    public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
 
     private static final ARecordType createIndexRecordType() {
         AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
         String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
-                "IsPrimary", "Timestamp" };
+                "IsPrimary", "Timestamp", "PendingOp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
+                olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
         return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
     };
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
index f696714..39b6a7f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
@@ -44,14 +44,17 @@
 
         GROUPNAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "GroupName", 3, new IAType[] { BuiltinType.ASTRING,
                 BuiltinType.ASTRING, BuiltinType.ASTRING },
-                new String[] { "GroupName", "DataverseName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID);
+                new String[] { "GroupName", "DataverseName", "DatasetName" }, null,
+                MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 1, 2 });
 
         DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "DatatypeName", 3, new IAType[] {
                 BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
-                "DatatypeName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID);
+                "DatatypeName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 0,
+                2 });
 
         DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex("Datatype", "DatatypeName", 3, new IAType[] {
                 BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
-                "NestedDatatypeName", "TopDatatypeName" }, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID);
+                "NestedDatatypeName", "TopDatatypeName" }, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID, false,
+                new int[] { 0, 2 });
     }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index bdd9dc5..3c96825 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -129,7 +129,7 @@
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
     private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
-    private final MetadataTransactionContext mdTxnCtx;
+    private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
     private Map<String, String[]> stores;
     private Map<String, String> config;
@@ -156,8 +156,7 @@
         return config;
     }
 
-    public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
-        this.mdTxnCtx = mdTxnCtx;
+    public AqlMetadataProvider(Dataverse defaultDataverse) {
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixProperties.INSTANCE.getStores();
     }
@@ -181,6 +180,10 @@
     public void setWriterFactory(IAWriterFactory writerFactory) {
         this.writerFactory = writerFactory;
     }
+    
+    public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
+        this.mdTxnCtx = mdTxnCtx;
+    }
 
     public MetadataTransactionContext getMetadataTxnContext() {
         return mdTxnCtx;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index 2adf4fc..e710c90 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -35,15 +35,18 @@
     private final DatasetType datasetType;
     private IDatasetDetails datasetDetails;
     private final int datasetId;
+    // Type of pending operations with respect to atomic DDL operation
+    private final int pendingOp;
 
     public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
-            DatasetType datasetType, int datasetId) {
+            DatasetType datasetType, int datasetId, int pendingOp) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.itemTypeName = itemTypeName;
         this.datasetType = datasetType;
         this.datasetDetails = datasetDetails;
         this.datasetId = datasetId;
+        this.pendingOp = pendingOp;
     }
 
     public String getDataverseName() {
@@ -73,6 +76,10 @@
     public int getDatasetId() {
         return datasetId;
     }
+    
+    public int getPendingOp() {
+        return pendingOp;
+    }
 
     @Override
     public Object addToCache(MetadataCache cache) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
index bacd812..3902282 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
@@ -27,10 +27,12 @@
     // Enforced to be unique within an Asterix cluster..
     private final String dataverseName;
     private final String dataFormat;
+    private final int pendingOp;
 
-    public Dataverse(String dataverseName, String format) {
+    public Dataverse(String dataverseName, String format, int pendingOp) {
         this.dataverseName = dataverseName;
         this.dataFormat = format;
+        this.pendingOp = pendingOp;
     }
 
     public String getDataverseName() {
@@ -40,6 +42,10 @@
     public String getDataFormat() {
         return dataFormat;
     }
+    
+    public int getPendingOp() {
+        return pendingOp;
+    }
 
     @Override
     public Object addToCache(MetadataCache cache) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
index 39f4689..7c23c65 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
@@ -45,9 +45,11 @@
     private final boolean isPrimaryIndex;
     // Specific to NGRAM indexes.
     private final int gramLength;
+    // Type of pending operations with respect to atomic DDL operation
+    private final int pendingOp;
 
     public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-            List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
+            List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.indexName = indexName;
@@ -55,10 +57,11 @@
         this.keyFieldNames = keyFieldNames;
         this.gramLength = gramLength;
         this.isPrimaryIndex = isPrimaryIndex;
+        this.pendingOp = pendingOp;
     }
 
     public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-            List<String> keyFieldNames, boolean isPrimaryIndex) {
+            List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.indexName = indexName;
@@ -66,6 +69,7 @@
         this.keyFieldNames = keyFieldNames;
         this.gramLength = -1;
         this.isPrimaryIndex = isPrimaryIndex;
+        this.pendingOp = pendingOp;
     }
 
     public String getDataverseName() {
@@ -95,6 +99,10 @@
     public boolean isPrimaryIndex() {
         return isPrimaryIndex;
     }
+    
+    public int getPendingOp() {
+        return pendingOp;
+    }
 
     public boolean isSecondaryIndex() {
         return !isPrimaryIndex();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 0cbd392..7af5aae 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -77,9 +77,9 @@
     protected ISerializerDeserializer<AInt32> aInt32Serde;
 
     @SuppressWarnings("unchecked")
-	public DatasetTupleTranslator(boolean getTuple) {
+    public DatasetTupleTranslator(boolean getTuple) {
         super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
-        aInt32 = new AMutableInt32(-1); 
+        aInt32 = new AMutableInt32(-1);
         aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
     }
 
@@ -104,8 +104,10 @@
                 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
         DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
         IDatasetDetails datasetDetails = null;
-    	int datasetId = ((AInt32) datasetRecord
+        int datasetId = ((AInt32) datasetRecord
                 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
+        int pendingOp = ((AInt32) datasetRecord
+                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
         switch (datasetType) {
             case FEED:
             case INTERNAL: {
@@ -197,7 +199,7 @@
                 }
                 datasetDetails = new ExternalDatasetDetails(adapter, properties);
         }
-        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
+        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
     }
 
     @Override
@@ -248,13 +250,19 @@
         aString.setValue(Calendar.getInstance().getTime().toString());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-        
+
         // write field 8
         fieldValue.reset();
         aInt32.setValue(dataset.getDatasetId());
         aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
-        
+
+        // write field 9
+        fieldValue.reset();
+        aInt32.setValue(dataset.getPendingOp());
+        aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
@@ -290,13 +298,15 @@
         fieldValue.reset();
         aString.setValue(name);
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
+        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
+                fieldValue);
 
         // write field 1
         fieldValue.reset();
         aString.setValue(value);
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
+        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
+                fieldValue);
 
         propertyRecordBuilder.write(out, true);
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
index 271bf90..f9eaf5a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
@@ -25,8 +25,11 @@
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
 import edu.uci.ics.asterix.om.base.ARecord;
 import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -40,12 +43,18 @@
     // Payload field containing serialized Dataverse.
     public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
 
+    private AMutableInt32 aInt32;
+    protected ISerializerDeserializer<AInt32> aInt32Serde;
+
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
 
+    @SuppressWarnings("unchecked")
     public DataverseTupleTranslator(boolean getTuple) {
         super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
+        aInt32 = new AMutableInt32(-1);
+        aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
     }
 
     @Override
@@ -57,7 +66,8 @@
         DataInput in = new DataInputStream(stream);
         ARecord dataverseRecord = recordSerDes.deserialize(in);
         return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
-                ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
+                ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
+                ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
     }
 
     @Override
@@ -88,6 +98,12 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
 
+        // write field 3
+        fieldValue.reset();
+        aInt32.setValue(instance.getPendingOp());
+        aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
index d71480f..c75b7b2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
@@ -96,13 +96,15 @@
         }
         Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
                 .getBoolean();
+        int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
+                .getIntegerValue();
         // Check if there is a gram length as well.
         int gramLength = -1;
         int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
         if (gramLenPos >= 0) {
             gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
         }
-        return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
+        return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
     }
 
     @Override
@@ -174,7 +176,12 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
 
-        // write optional field 7        
+        // write field 7
+        fieldValue.reset();
+        intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
+        // write optional field 8        
         if (instance.getGramLength() > 0) {
             fieldValue.reset();
             nameValue.reset();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
index 1eeadff..d69a36e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -167,7 +167,7 @@
 
                 if (tupleSize != 0) {
                     //old tuple
-                    tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer()
+                    tupleWriter.writeTuple(reusableLogContentObject.getOldValue(), logicalLogLocator.getBuffer()
                             .getArray(), logicalLogLocator.getMemoryOffset() + offset);
                 }
             }