changes towards recovery

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1098 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/diff_file b/diff_file
new file mode 100644
index 0000000..8efd61f
--- /dev/null
+++ b/diff_file
@@ -0,0 +1,2098 @@
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java	(working copy)
+@@ -25,8 +25,11 @@
+ import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+ import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+ import edu.uci.ics.asterix.metadata.entities.Dataverse;
++import edu.uci.ics.asterix.om.base.AInt32;
++import edu.uci.ics.asterix.om.base.AMutableInt32;
+ import edu.uci.ics.asterix.om.base.ARecord;
+ import edu.uci.ics.asterix.om.base.AString;
++import edu.uci.ics.asterix.om.types.BuiltinType;
+ import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+ import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+ 
+@@ -40,12 +43,18 @@
+     // Payload field containing serialized Dataverse.
+     public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
+ 
++    private AMutableInt32 aInt32;
++    protected ISerializerDeserializer<AInt32> aInt32Serde;
++
+     @SuppressWarnings("unchecked")
+     private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+             .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
+ 
++    @SuppressWarnings("unchecked")
+     public DataverseTupleTranslator(boolean getTuple) {
+         super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
++        aInt32 = new AMutableInt32(-1);
++        aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+     }
+ 
+     @Override
+@@ -57,7 +66,8 @@
+         DataInput in = new DataInputStream(stream);
+         ARecord dataverseRecord = recordSerDes.deserialize(in);
+         return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
+-                ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
++                ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
++                ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
+     }
+ 
+     @Override
+@@ -88,6 +98,12 @@
+         stringSerde.serialize(aString, fieldValue.getDataOutput());
+         recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ 
++        // write field 3
++        fieldValue.reset();
++        aInt32.setValue(instance.getPendingOp());
++        aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
++        recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
+         recordBuilder.write(tupleBuilder.getDataOutput(), true);
+         tupleBuilder.addFieldEndOffset();
+ 
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java	(working copy)
+@@ -77,9 +77,9 @@
+     protected ISerializerDeserializer<AInt32> aInt32Serde;
+ 
+     @SuppressWarnings("unchecked")
+-	public DatasetTupleTranslator(boolean getTuple) {
++    public DatasetTupleTranslator(boolean getTuple) {
+         super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+-        aInt32 = new AMutableInt32(-1); 
++        aInt32 = new AMutableInt32(-1);
+         aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+     }
+ 
+@@ -104,8 +104,10 @@
+                 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
+         DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
+         IDatasetDetails datasetDetails = null;
+-    	int datasetId = ((AInt32) datasetRecord
++        int datasetId = ((AInt32) datasetRecord
+                 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
++        int pendingOp = ((AInt32) datasetRecord
++                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
+         switch (datasetType) {
+             case FEED:
+             case INTERNAL: {
+@@ -197,7 +199,7 @@
+                 }
+                 datasetDetails = new ExternalDatasetDetails(adapter, properties);
+         }
+-        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
++        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
+     }
+ 
+     @Override
+@@ -248,13 +250,19 @@
+         aString.setValue(Calendar.getInstance().getTime().toString());
+         stringSerde.serialize(aString, fieldValue.getDataOutput());
+         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+-        
++
+         // write field 8
+         fieldValue.reset();
+         aInt32.setValue(dataset.getDatasetId());
+         aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
+-        
++
++        // write field 9
++        fieldValue.reset();
++        aInt32.setValue(dataset.getPendingOp());
++        aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
++        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
+         // write record
+         recordBuilder.write(tupleBuilder.getDataOutput(), true);
+         tupleBuilder.addFieldEndOffset();
+@@ -290,13 +298,15 @@
+         fieldValue.reset();
+         aString.setValue(name);
+         stringSerde.serialize(aString, fieldValue.getDataOutput());
+-        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
++        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
++                fieldValue);
+ 
+         // write field 1
+         fieldValue.reset();
+         aString.setValue(value);
+         stringSerde.serialize(aString, fieldValue.getDataOutput());
+-        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
++        propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
++                fieldValue);
+ 
+         propertyRecordBuilder.write(out, true);
+     }
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java	(working copy)
+@@ -96,13 +96,15 @@
+         }
+         Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
+                 .getBoolean();
++        int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
++                .getIntegerValue();
+         // Check if there is a gram length as well.
+         int gramLength = -1;
+         int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
+         if (gramLenPos >= 0) {
+             gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
+         }
+-        return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
++        return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
+     }
+ 
+     @Override
+@@ -174,7 +176,12 @@
+         stringSerde.serialize(aString, fieldValue.getDataOutput());
+         recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ 
+-        // write optional field 7        
++        // write field 7
++        fieldValue.reset();
++        intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
++        recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
++        // write optional field 8        
+         if (instance.getGramLength() > 0) {
+             fieldValue.reset();
+             nameValue.reset();
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java	(working copy)
+@@ -129,7 +129,7 @@
+ 
+ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+     private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
+-    private final MetadataTransactionContext mdTxnCtx;
++    private MetadataTransactionContext mdTxnCtx;
+     private boolean isWriteTransaction;
+     private Map<String, String[]> stores;
+     private Map<String, String> config;
+@@ -156,8 +156,7 @@
+         return config;
+     }
+ 
+-    public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
+-        this.mdTxnCtx = mdTxnCtx;
++    public AqlMetadataProvider(Dataverse defaultDataverse) {
+         this.defaultDataverse = defaultDataverse;
+         this.stores = AsterixProperties.INSTANCE.getStores();
+     }
+@@ -181,6 +180,10 @@
+     public void setWriterFactory(IAWriterFactory writerFactory) {
+         this.writerFactory = writerFactory;
+     }
++    
++    public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
++        this.mdTxnCtx = mdTxnCtx;
++    }
+ 
+     public MetadataTransactionContext getMetadataTxnContext() {
+         return mdTxnCtx;
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java	(working copy)
+@@ -35,15 +35,18 @@
+     private final DatasetType datasetType;
+     private IDatasetDetails datasetDetails;
+     private final int datasetId;
++    // Type of pending operations with respect to atomic DDL operation
++    private final int pendingOp;
+ 
+     public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
+-            DatasetType datasetType, int datasetId) {
++            DatasetType datasetType, int datasetId, int pendingOp) {
+         this.dataverseName = dataverseName;
+         this.datasetName = datasetName;
+         this.itemTypeName = itemTypeName;
+         this.datasetType = datasetType;
+         this.datasetDetails = datasetDetails;
+         this.datasetId = datasetId;
++        this.pendingOp = pendingOp;
+     }
+ 
+     public String getDataverseName() {
+@@ -73,6 +76,10 @@
+     public int getDatasetId() {
+         return datasetId;
+     }
++    
++    public int getPendingOp() {
++        return pendingOp;
++    }
+ 
+     @Override
+     public Object addToCache(MetadataCache cache) {
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java	(working copy)
+@@ -45,9 +45,11 @@
+     private final boolean isPrimaryIndex;
+     // Specific to NGRAM indexes.
+     private final int gramLength;
++    // Type of pending operations with respect to atomic DDL operation
++    private final int pendingOp;
+ 
+     public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+-            List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
++            List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
+         this.dataverseName = dataverseName;
+         this.datasetName = datasetName;
+         this.indexName = indexName;
+@@ -55,10 +57,11 @@
+         this.keyFieldNames = keyFieldNames;
+         this.gramLength = gramLength;
+         this.isPrimaryIndex = isPrimaryIndex;
++        this.pendingOp = pendingOp;
+     }
+ 
+     public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+-            List<String> keyFieldNames, boolean isPrimaryIndex) {
++            List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
+         this.dataverseName = dataverseName;
+         this.datasetName = datasetName;
+         this.indexName = indexName;
+@@ -66,6 +69,7 @@
+         this.keyFieldNames = keyFieldNames;
+         this.gramLength = -1;
+         this.isPrimaryIndex = isPrimaryIndex;
++        this.pendingOp = pendingOp;
+     }
+ 
+     public String getDataverseName() {
+@@ -95,6 +99,10 @@
+     public boolean isPrimaryIndex() {
+         return isPrimaryIndex;
+     }
++    
++    public int getPendingOp() {
++        return pendingOp;
++    }
+ 
+     public boolean isSecondaryIndex() {
+         return !isPrimaryIndex();
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java	(working copy)
+@@ -27,10 +27,12 @@
+     // Enforced to be unique within an Asterix cluster..
+     private final String dataverseName;
+     private final String dataFormat;
++    private final int pendingOp;
+ 
+-    public Dataverse(String dataverseName, String format) {
++    public Dataverse(String dataverseName, String format, int pendingOp) {
+         this.dataverseName = dataverseName;
+         this.dataFormat = format;
++        this.pendingOp = pendingOp;
+     }
+ 
+     public String getDataverseName() {
+@@ -40,6 +42,10 @@
+     public String getDataFormat() {
+         return dataFormat;
+     }
++    
++    public int getPendingOp() {
++        return pendingOp;
++    }
+ 
+     @Override
+     public Object addToCache(MetadataCache cache) {
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java	(working copy)
+@@ -25,6 +25,7 @@
+ import edu.uci.ics.asterix.common.exceptions.AsterixException;
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+ import edu.uci.ics.asterix.metadata.api.IMetadataNode;
+ import edu.uci.ics.asterix.metadata.api.IValueExtractor;
+@@ -160,7 +161,7 @@
+                 // Add the primary index for the dataset.
+                 InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
+                 Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
+-                        dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
++                        dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
+                 addIndex(jobId, primaryIndex);
+                 ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
+                         dataset.getDatasetName());
+@@ -260,7 +261,7 @@
+         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
+                 NoOpOperationCallback.INSTANCE);
+         TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
++        //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+         // TODO: fix exceptions once new BTree exception model is in hyracks.
+         indexAccessor.insert(tuple);
+         //TODO: extract the key from the tuple and get the PKHashValue from the key.
+@@ -536,7 +537,7 @@
+         // The transaction with txnId will have an S lock on the
+         // resource. Note that lock converters have a higher priority than
+         // regular waiters in the LockManager.
+-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
++        //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+         indexAccessor.delete(tuple);
+         //TODO: extract the key from the tuple and get the PKHashValue from the key.
+         //check how to get the oldValue.
+@@ -803,7 +804,9 @@
+     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
+             IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
+         TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+-        transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
++        //#. currently lock is not needed to access any metadata 
++        //   since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
++        //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
+         long resourceID = index.getResourceID();
+         IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java	(working copy)
+@@ -20,6 +20,11 @@
+ import edu.uci.ics.asterix.metadata.MetadataCache;
+ 
+ public interface IMetadataEntity extends Serializable {
++    
++    public static final int PENDING_NO_OP = 0;
++    public static final int PENDING_ADD_OP = 1;
++    public static final int PENDING_DROP_OP = 2;
++    
+     Object addToCache(MetadataCache cache);
+ 
+     Object dropFromCache(MetadataCache cache);
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java	(working copy)
+@@ -17,6 +17,8 @@
+ 
+ import java.rmi.RemoteException;
+ import java.util.List;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+@@ -79,11 +81,10 @@
+ public class MetadataManager implements IMetadataManager {
+     // Set in init().
+     public static MetadataManager INSTANCE;
+-
+     private final MetadataCache cache = new MetadataCache();
+     private IAsterixStateProxy proxy;
+     private IMetadataNode metadataNode;
+-
++    
+     public MetadataManager(IAsterixStateProxy proxy) {
+         if (proxy == null) {
+             throw new Error("Null proxy given to MetadataManager.");
+@@ -206,11 +207,14 @@
+ 
+     @Override
+     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
++        // add dataset into metadataNode 
+         try {
+             metadataNode.addDataset(ctx.getJobId(), dataset);
+         } catch (RemoteException e) {
+             throw new MetadataException(e);
+         }
++        
++        // reflect the dataset into the cache
+         ctx.addDataset(dataset);
+     }
+ 
+@@ -585,4 +589,5 @@
+         }
+         return adapter;
+     }
++
+ }
+\ No newline at end of file
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java	(working copy)
+@@ -19,6 +19,7 @@
+ 
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+ import edu.uci.ics.asterix.metadata.entities.Datatype;
+@@ -104,19 +105,19 @@
+     }
+ 
+     public void dropDataset(String dataverseName, String datasetName) {
+-        Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
++        Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
+         droppedCache.addDatasetIfNotExists(dataset);
+         logAndApply(new MetadataLogicalOperation(dataset, false));
+     }
+ 
+     public void dropIndex(String dataverseName, String datasetName, String indexName) {
+-        Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
++        Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
+         droppedCache.addIndexIfNotExists(index);
+         logAndApply(new MetadataLogicalOperation(index, false));
+     }
+ 
+     public void dropDataverse(String dataverseName) {
+-        Dataverse dataverse = new Dataverse(dataverseName, null);
++        Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
+         droppedCache.addDataverseIfNotExists(dataverse);
+         logAndApply(new MetadataLogicalOperation(dataverse, false));
+     }
+@@ -162,7 +163,7 @@
+         }
+         return droppedCache.getDataset(dataverseName, datasetName) != null;
+     }
+-    
++
+     public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
+         if (droppedCache.getDataverse(dataverseName) != null) {
+             return true;
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java	(working copy)
+@@ -80,10 +80,11 @@
+     public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
+     public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
+     public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
++    public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
+ 
+     private static final ARecordType createDataverseRecordType() {
+-        return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
+-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
++        return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
++                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
+     }
+ 
+     // Helper constants for accessing fields in an ARecord of anonymous type
+@@ -158,10 +159,11 @@
+     public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
+     public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
+     public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
++    public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
+ 
+     private static final ARecordType createDatasetRecordType() {
+         String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
+-                "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
++                "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
+ 
+         List<IAType> internalRecordUnionList = new ArrayList<IAType>();
+         internalRecordUnionList.add(BuiltinType.ANULL);
+@@ -179,7 +181,8 @@
+         AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
+ 
+         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+-                internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
++                internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
++                BuiltinType.AINT32 };
+         return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
+     }
+ 
+@@ -264,13 +267,14 @@
+     public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
+     public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
+     public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
++    public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
+ 
+     private static final ARecordType createIndexRecordType() {
+         AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
+         String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
+-                "IsPrimary", "Timestamp" };
++                "IsPrimary", "Timestamp", "PendingOp" };
+         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+-                olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
++                olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
+         return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
+     };
+ 
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java	(revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java	(working copy)
+@@ -31,6 +31,7 @@
+ import edu.uci.ics.asterix.metadata.IDatasetDetails;
+ import edu.uci.ics.asterix.metadata.MetadataManager;
+ import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+ import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+@@ -226,7 +227,7 @@
+     public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
+         String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
+         String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
+-        MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
++        MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
+     }
+ 
+     public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
+@@ -236,7 +237,7 @@
+                     primaryIndexes[i].getNodeGroupName());
+             MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
+                     primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
+-                    id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
++                    id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
+         }
+     }
+ 
+@@ -267,7 +268,7 @@
+         for (int i = 0; i < secondaryIndexes.length; i++) {
+             MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
+                     secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
+-                    secondaryIndexes[i].getPartitioningExpr(), false));
++                    secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
+         }
+     }
+ 
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+===================================================================
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java	(revision 1061)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java	(working copy)
+@@ -95,11 +95,11 @@
+             File testFile = tcCtx.getTestFile(cUnit);
+ 
+             /*************** to avoid run failure cases ****************
+-            if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
++            if (!testFile.getAbsolutePath().contains("index-selection/")) {
+                 continue;
+             }
+             ************************************************************/
+-
++            
+             File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
+             File actualFile = new File(PATH_ACTUAL + File.separator
+                     + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
+Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+===================================================================
+--- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java	(revision 1061)
++++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java	(working copy)
+@@ -90,7 +90,7 @@
+ 
+     private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+ 
+-    public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
++    public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+             AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
+             ACIDException, AsterixException {
+ 
+@@ -111,67 +111,10 @@
+             throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+         }
+         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+-            return new JobSpecification[0];
++            return new JobSpecification();
+         }
+-
+-        List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
+-                dataset.getDatasetName());
+-        int numSecondaryIndexes = 0;
+-        for (Index index : datasetIndexes) {
+-            if (index.isSecondaryIndex()) {
+-                numSecondaryIndexes++;
+-            }
+-        }
+-        JobSpecification[] specs;
+-        if (numSecondaryIndexes > 0) {
+-            specs = new JobSpecification[numSecondaryIndexes + 1];
+-            int i = 0;
+-            // First, drop secondary indexes.
+-            for (Index index : datasetIndexes) {
+-                if (index.isSecondaryIndex()) {
+-                    specs[i] = new JobSpecification();
+-                    Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
+-                            .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
+-                                    datasetName, index.getIndexName());
+-                    IIndexDataflowHelperFactory dfhFactory;
+-                    switch (index.getIndexType()) {
+-                        case BTREE:
+-                            dfhFactory = new LSMBTreeDataflowHelperFactory(
+-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
+-                            break;
+-                        case RTREE:
+-                            dfhFactory = new LSMRTreeDataflowHelperFactory(
+-                                    new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
+-                                    new IBinaryComparatorFactory[] { null },
+-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
+-                            break;
+-                        case NGRAM_INVIX:
+-                        case WORD_INVIX:
+-                            dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
+-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+-                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
+-                            break;
+-                        default:
+-                            throw new AsterixException("Unknown index type provided.");
+-                    }
+-                    IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
+-                            AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+-                            AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
+-                    AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
+-                            idxSplitsAndConstraint.second);
+-                    i++;
+-                }
+-            }
+-        } else {
+-            specs = new JobSpecification[1];
+-        }
++        
+         JobSpecification specPrimary = new JobSpecification();
+-        specs[specs.length - 1] = specPrimary;
+ 
+         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
+@@ -187,7 +130,7 @@
+ 
+         specPrimary.addRoot(primaryBtreeDrop);
+ 
+-        return specs;
++        return specPrimary;
+     }
+ 
+     public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+===================================================================
+--- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java	(revision 1061)
++++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java	(working copy)
+@@ -21,6 +21,8 @@
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import org.json.JSONException;
+ 
+@@ -68,6 +70,7 @@
+ import edu.uci.ics.asterix.metadata.MetadataException;
+ import edu.uci.ics.asterix.metadata.MetadataManager;
+ import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+ import edu.uci.ics.asterix.metadata.entities.Datatype;
+@@ -112,6 +115,7 @@
+     private final PrintWriter out;
+     private final SessionConfig sessionConfig;
+     private final DisplayFormat pdf;
++    private final ReadWriteLock cacheLatch;
+     private Dataverse activeDefaultDataverse;
+     private List<FunctionDecl> declaredFunctions;
+ 
+@@ -121,6 +125,7 @@
+         this.out = out;
+         this.sessionConfig = pc;
+         this.pdf = pdf;
++        this.cacheLatch = new ReentrantReadWriteLock(true);
+         declaredFunctions = getDeclaredFunctions(aqlStatements);
+     }
+ 
+@@ -143,8 +148,7 @@
+ 
+         for (Statement stmt : aqlStatements) {
+             validateOperation(activeDefaultDataverse, stmt);
+-            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
++            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+             metadataProvider.setWriterFactory(writerFactory);
+             metadataProvider.setOutputFile(outputFile);
+             metadataProvider.setConfig(config);
+@@ -253,15 +257,9 @@
+                     }
+ 
+                 }
+-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+             } catch (Exception e) {
+-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                 throw new AlgebricksException(e);
+             }
+-            // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
+-            for (JobSpecification jobspec : jobsToExecute) {
+-                runJob(hcc, jobspec);
+-            }
+         }
+         return executionResult;
+     }
+@@ -289,398 +287,802 @@
+ 
+     private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
+-        DataverseDecl dvd = (DataverseDecl) stmt;
+-        String dvName = dvd.getDataverseName().getValue();
+-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-        if (dv == null) {
+-            throw new MetadataException("Unknown dataverse " + dvName);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            DataverseDecl dvd = (DataverseDecl) stmt;
++            String dvName = dvd.getDataverseName().getValue();
++            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
++            if (dv == null) {
++                throw new MetadataException("Unknown dataverse " + dvName);
++            }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            return dv;
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new MetadataException(e);
++        } finally {
++            releaseReadLatch();
+         }
+-        return dv;
+     }
+ 
+     private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+             ACIDException {
+-        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+-        String dvName = stmtCreateDataverse.getDataverseName().getValue();
+-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-        if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+-            throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
++            String dvName = stmtCreateDataverse.getDataverseName().getValue();
++            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
++            if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
++                throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
++            }
++            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
++                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+-        MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+-                stmtCreateDataverse.getFormat()));
+     }
+ 
+     private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
+-        DatasetDecl dd = (DatasetDecl) stmt;
+-        String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+-                : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
+-        }
+-        String datasetName = dd.getName().getValue();
+-        DatasetType dsType = dd.getDatasetType();
+-        String itemTypeName = dd.getItemTypeName().getValue();
+ 
+-        IDatasetDetails datasetDetails = null;
+-        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-                datasetName);
+-        if (ds != null) {
+-            if (dd.getIfNotExists()) {
+-                return;
+-            } else {
+-                throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            DatasetDecl dd = (DatasetDecl) stmt;
++            String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
++                    : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
+             }
+-        }
+-        Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+-                itemTypeName);
+-        if (dt == null) {
+-            throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+-        }
+-        switch (dd.getDatasetType()) {
+-            case INTERNAL: {
+-                IAType itemType = dt.getDatatype();
+-                if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-                    throw new AlgebricksException("Can only partition ARecord's.");
++            String datasetName = dd.getName().getValue();
++            DatasetType dsType = dd.getDatasetType();
++            String itemTypeName = dd.getItemTypeName().getValue();
++
++            IDatasetDetails datasetDetails = null;
++            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
++                    datasetName);
++            if (ds != null) {
++                if (dd.getIfNotExists()) {
++                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                    return;
++                } else {
++                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+                 }
+-                List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+-                        .getPartitioningExprs();
+-                String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-                datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
+-                break;
+             }
+-            case EXTERNAL: {
+-                String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+-                Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+-                datasetDetails = new ExternalDatasetDetails(adapter, properties);
+-                break;
++            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
++                    itemTypeName);
++            if (dt == null) {
++                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+             }
+-            case FEED: {
+-                IAType itemType = dt.getDatatype();
+-                if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-                    throw new AlgebricksException("Can only partition ARecord's.");
++            switch (dd.getDatasetType()) {
++                case INTERNAL: {
++                    IAType itemType = dt.getDatatype();
++                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
++                        throw new AlgebricksException("Can only partition ARecord's.");
++                    }
++                    List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
++                            .getPartitioningExprs();
++                    String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
++                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
++                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
++                            ngName);
++                    break;
+                 }
+-                List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
+-                String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-                String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+-                Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
+-                FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+-                datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
+-                        adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+-                break;
++                case EXTERNAL: {
++                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
++                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
++                    datasetDetails = new ExternalDatasetDetails(adapter, properties);
++                    break;
++                }
++                case FEED: {
++                    IAType itemType = dt.getDatatype();
++                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
++                        throw new AlgebricksException("Can only partition ARecord's.");
++                    }
++                    List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
++                            .getPartitioningExprs();
++                    String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
++                    String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
++                    Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
++                            .getConfiguration();
++                    FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
++                    datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
++                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
++                            ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
++                    break;
++                }
+             }
++
++            //#. add a new dataset with PendingAddOp
++            Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
++                    DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
++            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
++
++            if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
++                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
++                        dataverseName);
++                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
++                        metadataProvider);
++
++                //#. make metadataTxn commit before calling runJob.
++                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                bActiveTxn = false;
++
++                //#. runJob
++                runJob(hcc, jobSpec);
++
++                //#. begin new metadataTxn
++                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++                bActiveTxn = true;
++                metadataProvider.setMetadataTxnContext(mdTxnCtx);
++            }
++
++            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
++            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
++            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
++                    datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
++                    IMetadataEntity.PENDING_NO_OP));
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+-        MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+-                datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
+-        if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+-                    dataverseName);
+-            runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+-        }
+     }
+ 
+     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+-        String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
+-        }
+-        String datasetName = stmtCreateIndex.getDatasetName().getValue();
+-        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-                datasetName);
+-        if (ds == null) {
+-            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-                    + dataverseName);
+-        }
+-        String indexName = stmtCreateIndex.getIndexName().getValue();
+-        Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+-                datasetName, indexName);
+-        if (idx != null) {
+-            if (!stmtCreateIndex.getIfNotExists()) {
+-                throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+-            } else {
+-                stmtCreateIndex.setNeedToCreate(false);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
++            String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
+             }
+-        } else {
++            String datasetName = stmtCreateIndex.getDatasetName().getValue();
++
++            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
++                    datasetName);
++            if (ds == null) {
++                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
++                        + dataverseName);
++            }
++
++            String indexName = stmtCreateIndex.getIndexName().getValue();
++            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
++                    datasetName, indexName);
++
++            if (idx != null) {
++                if (!stmtCreateIndex.getIfNotExists()) {
++                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
++                } else {
++                    stmtCreateIndex.setNeedToCreate(false);
++                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                    return;
++                }
++            }
++
++            //#. add a new index with PendingAddOp
+             Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+-                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
++                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
++                    IMetadataEntity.PENDING_ADD_OP);
+             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+-            runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
+ 
++            //#. create the index artifact in NC.
+             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+                     index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+-            JobSpecification loadIndexJobSpec = IndexOperations
+-                    .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+-            runJob(hcc, loadIndexJobSpec);
++            JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
++            if (spec == null) {
++                throw new AsterixException("Failed to create job spec for creating index '"
++                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
++            }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            runJob(hcc, spec);
++
++            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++            bActiveTxn = true;
++            metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++            //#. load data into the index in NC.
++            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
++                    index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
++            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            runJob(hcc, spec);
++
++            //#. begin new metadataTxn
++            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++            bActiveTxn = true;
++            metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
++            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
++                    indexName);
++            index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
++                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
++                    IMetadataEntity.PENDING_NO_OP);
++            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
+             MetadataException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        TypeDecl stmtCreateType = (TypeDecl) stmt;
+-        String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
+-        }
+-        String typeName = stmtCreateType.getIdent().getValue();
+-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+-        if (dv == null) {
+-            throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+-        }
+-        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-        if (dt != null) {
+-            if (!stmtCreateType.getIfNotExists())
+-                throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+-        } else {
+-            if (builtinTypeMap.get(typeName) != null) {
+-                throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            TypeDecl stmtCreateType = (TypeDecl) stmt;
++            String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
++            }
++            String typeName = stmtCreateType.getIdent().getValue();
++            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
++            if (dv == null) {
++                throw new AlgebricksException("Unknonw dataverse " + dataverseName);
++            }
++            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
++            if (dt != null) {
++                if (!stmtCreateType.getIfNotExists()) {
++                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
++                }
+             } else {
+-                Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+-                        dataverseName);
+-                TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+-                IAType type = typeMap.get(typeSignature);
+-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
++                if (builtinTypeMap.get(typeName) != null) {
++                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
++                } else {
++                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
++                            dataverseName);
++                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
++                    IAType type = typeMap.get(typeSignature);
++                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
++                }
+             }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+-        String dvName = stmtDelete.getDataverseName().getValue();
+ 
+-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+-        if (dv == null) {
+-            if (!stmtDelete.getIfExists()) {
+-                throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
++            String dvName = stmtDelete.getDataverseName().getValue();
++
++            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
++            if (dv == null) {
++                if (!stmtDelete.getIfExists()) {
++                    throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
++                }
++                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                return;
+             }
+-        } else {
++
++            //#. prepare jobs which will drop corresponding datasets with indexes. 
+             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+             for (int j = 0; j < datasets.size(); j++) {
+                 String datasetName = datasets.get(j).getDatasetName();
+                 DatasetType dsType = datasets.get(j).getDatasetType();
+                 if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
++
+                     List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
+                     for (int k = 0; k < indexes.size(); k++) {
+                         if (indexes.get(k).isSecondaryIndex()) {
+-                            compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
+-                                    metadataProvider);
++                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
++                                    indexes.get(k).getIndexName());
++                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+                         }
+                     }
++
++                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
++                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+                 }
+-                compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
+             }
+ 
++            //#. mark PendingDropOp on the dataverse record by 
++            //   first, deleting the dataverse record from the DATAVERSE_DATASET
++            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
++            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
++                    IMetadataEntity.PENDING_DROP_OP));
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            for (JobSpecification jobSpec : jobsToExecute) {
++                runJob(hcc, jobSpec);
++            }
++
++            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++            bActiveTxn = true;
++            metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++            //#. finally, delete the dataverse.
++            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
+                 activeDefaultDataverse = null;
+             }
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        DropStatement stmtDelete = (DropStatement) stmt;
+-        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
+-        }
+-        String datasetName = stmtDelete.getDatasetName().getValue();
+-        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-        if (ds == null) {
+-            if (!stmtDelete.getIfExists())
+-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-                        + dataverseName + ".");
+-        } else {
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            DropStatement stmtDelete = (DropStatement) stmt;
++            String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
++            }
++            String datasetName = stmtDelete.getDatasetName().getValue();
++
++            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
++            if (ds == null) {
++                if (!stmtDelete.getIfExists()) {
++                    throw new AlgebricksException("There is no dataset with this name " + datasetName
++                            + " in dataverse " + dataverseName + ".");
++                }
++                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                return;
++            }
++
+             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
++
++                //#. prepare jobs to drop the datatset and the indexes in NC
+                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+                 for (int j = 0; j < indexes.size(); j++) {
+-                    if (indexes.get(j).isPrimaryIndex()) {
+-                        compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
+-                                metadataProvider);
++                    if (indexes.get(j).isSecondaryIndex()) {
++                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
++                                indexes.get(j).getIndexName());
++                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+                     }
+                 }
++                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
++                jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
++
++                //#. mark the existing dataset as PendingDropOp
++                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++                MetadataManager.INSTANCE.addDataset(
++                        mdTxnCtx,
++                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
++                                .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
++
++                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                bActiveTxn = false;
++
++                //#. run the jobs
++                for (JobSpecification jobSpec : jobsToExecute) {
++                    runJob(hcc, jobSpec);
++                }
++
++                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++                bActiveTxn = true;
++                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+             }
+-            compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
++
++            //#. finally, delete the dataset.
++            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+-        String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
++            String datasetName = stmtIndexDrop.getDatasetName().getValue();
++            String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
++            }
++
++            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
++            if (ds == null) {
++                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
++                        + dataverseName);
++            }
++
++            if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
++                String indexName = stmtIndexDrop.getIndexName().getValue();
++                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++                if (index == null) {
++                    if (!stmtIndexDrop.getIfExists()) {
++                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
++                    }
++                } else {
++                    //#. prepare a job to drop the index in NC.
++                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
++                            indexName);
++                    jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
++
++                    //#. mark PendingDropOp on the existing index
++                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++                    MetadataManager.INSTANCE.addIndex(
++                            mdTxnCtx,
++                            new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
++                                    .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
++
++                    //#. commit the existing transaction before calling runJob. 
++                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++                    bActiveTxn = false;
++
++                    for (JobSpecification jobSpec : jobsToExecute) {
++                        runJob(hcc, jobSpec);
++                    }
++
++                    //#. begin a new transaction
++                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++                    bActiveTxn = true;
++                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++                    //#. finally, delete the existing index
++                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++                }
++            } else {
++                throw new AlgebricksException(datasetName
++                        + " is an external dataset. Indexes are not maintained for external datasets.");
++            }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++
++        } finally {
++            releaseWriteLatch();
+         }
+-        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-        if (ds == null)
+-            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-                    + dataverseName);
+-        if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-            String indexName = stmtIndexDrop.getIndexName().getValue();
+-            Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-            if (idx == null) {
+-                if (!stmtIndexDrop.getIfExists())
+-                    throw new AlgebricksException("There is no index with this name " + indexName + ".");
+-            } else
+-                compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
+-        } else {
+-            throw new AlgebricksException(datasetName
+-                    + " is an external dataset. Indexes are not maintained for external datasets.");
+-        }
+     }
+ 
+     private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+             ACIDException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+-        String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+-        if (dataverseName == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
++            String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
++            if (dataverseName == null) {
++                throw new AlgebricksException(" dataverse not specified ");
++            }
++            String typeName = stmtTypeDrop.getTypeName().getValue();
++            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
++            if (dt == null) {
++                if (!stmtTypeDrop.getIfExists())
++                    throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
++            } else {
++                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
++            }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+-        String typeName = stmtTypeDrop.getTypeName().getValue();
+-        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-        if (dt == null) {
+-            if (!stmtTypeDrop.getIfExists())
+-                throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+-        } else {
+-            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+-        }
+     }
+ 
+     private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+             ACIDException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+-        String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+-        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+-        if (ng == null) {
+-            if (!stmtDelete.getIfExists())
+-                throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+-        } else {
+-            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
++            String nodegroupName = stmtDelete.getNodeGroupName().getValue();
++            NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
++            if (ng == null) {
++                if (!stmtDelete.getIfExists())
++                    throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
++            } else {
++                MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
++            }
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+             ACIDException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+-        String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+-        if (dataverse == null) {
+-            throw new AlgebricksException(" dataverse not specified ");
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
++            String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
++            if (dataverse == null) {
++                throw new AlgebricksException(" dataverse not specified ");
++            }
++            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
++            if (dv == null) {
++                throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
++            }
++            Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
++                    .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
++                    Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
++            MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+-        if (dv == null) {
+-            throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+-        }
+-        Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+-                .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+-                Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+-        MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+     }
+ 
+     private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
+             AlgebricksException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+-        FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+-        Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+-        if (function == null) {
+-            if (!stmtDropFunction.getIfExists())
+-                throw new AlgebricksException("Unknonw function " + signature);
+-        } else {
+-            MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
++            FunctionSignature signature = stmtDropFunction.getFunctionSignature();
++            Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
++            if (function == null) {
++                if (!stmtDropFunction.getIfExists())
++                    throw new AlgebricksException("Unknonw function " + signature);
++            } else {
++                MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
++            }
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+     private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+-        String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+-        CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
+-                .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+ 
+-        IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+-        Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+-        jobsToExecute.add(job.getJobSpec());
+-        // Also load the dataset's secondary indexes.
+-        List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+-                .getDatasetName().getValue());
+-        for (Index index : datasetIndexes) {
+-            if (!index.isSecondaryIndex()) {
+-                continue;
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
++            String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
++            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
++                    .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
++                    loadStmt.dataIsAlreadySorted());
++
++            IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
++            Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
++            jobsToExecute.add(job.getJobSpec());
++            // Also load the dataset's secondary indexes.
++            List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
++                    .getDatasetName().getValue());
++            for (Index index : datasetIndexes) {
++                if (!index.isSecondaryIndex()) {
++                    continue;
++                }
++                // Create CompiledCreateIndexStatement from metadata entity 'index'.
++                CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
++                        dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
++                        index.getIndexType());
++                jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+             }
+-            // Create CompiledCreateIndexStatement from metadata entity 'index'.
+-            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+-                    index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+-            jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            for (JobSpecification jobspec : jobsToExecute) {
++                runJob(hcc, jobspec);
++            }
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+     }
+ 
+     private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        metadataProvider.setWriteTransaction(true);
+-        WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+-        String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+-        CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+-                .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+ 
+-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-        if (compiled.first != null) {
+-            jobsToExecute.add(compiled.first);
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            metadataProvider.setWriteTransaction(true);
++            WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
++            String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
++            CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
++                    .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
++
++            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++                    clfrqs);
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++            if (compiled.first != null) {
++                runJob(hcc, compiled.first);
++            }
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+     }
+ 
+     private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        metadataProvider.setWriteTransaction(true);
+-        InsertStatement stmtInsert = (InsertStatement) stmt;
+-        String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+-        CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+-                .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-        if (compiled.first != null) {
+-            jobsToExecute.add(compiled.first);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            metadataProvider.setWriteTransaction(true);
++            InsertStatement stmtInsert = (InsertStatement) stmt;
++            String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
++            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
++                    .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
++            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++                    clfrqs);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            if (compiled.first != null) {
++                runJob(hcc, compiled.first);
++            }
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+     }
+ 
+     private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        metadataProvider.setWriteTransaction(true);
+-        DeleteStatement stmtDelete = (DeleteStatement) stmt;
+-        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-        CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+-                stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+-                stmtDelete.getVarCounter(), metadataProvider);
+-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-        if (compiled.first != null) {
+-            jobsToExecute.add(compiled.first);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            metadataProvider.setWriteTransaction(true);
++            DeleteStatement stmtDelete = (DeleteStatement) stmt;
++            String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
++            CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
++                    stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
++                    stmtDelete.getVarCounter(), metadataProvider);
++            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++                    clfrqs);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            if (compiled.first != null) {
++                runJob(hcc, compiled.first);
++            }
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+     }
+ 
+@@ -704,46 +1106,109 @@
+ 
+     private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+-        String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+ 
+-        CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
+-                bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
+ 
+-        Dataset dataset;
+-        dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+-                .getDatasetName().getValue());
+-        IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+-        if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+-            throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
++        try {
++            BeginFeedStatement bfs = (BeginFeedStatement) stmt;
++            String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
++
++            CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
++                    .getValue(), bfs.getQuery(), bfs.getVarCounter());
++
++            Dataset dataset;
++            dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
++                    .getDatasetName().getValue());
++            IDatasetDetails datasetDetails = dataset.getDatasetDetails();
++            if (datasetDetails.getDatasetType() != DatasetType.FEED) {
++                throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
++                        + " is not a feed dataset");
++            }
++            bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
++            cbfs.setQuery(bfs.getQuery());
++            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            if (compiled.first != null) {
++                runJob(hcc, compiled.first);
++            }
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+-        bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+-        cbfs.setQuery(bfs.getQuery());
+-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+-        if (compiled.first != null) {
+-            jobsToExecute.add(compiled.first);
+-        }
+     }
+ 
+     private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-        ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+-        String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-                : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+-        CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
+-                cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+-        jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            ControlFeedStatement cfs = (ControlFeedStatement) stmt;
++            String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
++                    : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
++            CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
++                    dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
++            JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
++
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            runJob(hcc, jobSpec);
++
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
++        }
+     }
+ 
+     private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+             List<JobSpecification> jobsToExecute) throws Exception {
+-        Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+-        if (compiled.first != null) {
+-            GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+-            jobsToExecute.add(compiled.first);
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        boolean bActiveTxn = true;
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireReadLatch();
++
++        try {
++            Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
++
++            QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++            bActiveTxn = false;
++
++            if (compiled.first != null) {
++                GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
++                runJob(hcc, compiled.first);
++            }
++
++            return queryResult;
++        } catch (Exception e) {
++            if (bActiveTxn) {
++                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            }
++            throw new AlgebricksException(e);
++        } finally {
++            releaseReadLatch();
+         }
+-        return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+     }
+ 
+     private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
+@@ -768,20 +1233,32 @@
+     private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+             List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+             ACIDException {
+-        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-        NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+-        String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+-        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+-        if (ng != null) {
+-            if (!stmtCreateNodegroup.getIfNotExists())
+-                throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+-        } else {
+-            List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+-            List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+-            for (Identifier id : ncIdentifiers) {
+-                ncNames.add(id.getValue());
++
++        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++        acquireWriteLatch();
++
++        try {
++            NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
++            String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
++            NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
++            if (ng != null) {
++                if (!stmtCreateNodegroup.getIfNotExists())
++                    throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
++            } else {
++                List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
++                List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
++                for (Identifier id : ncIdentifiers) {
++                    ncNames.add(id.getValue());
++                }
++                MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+             }
+-            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++        } catch (Exception e) {
++            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++            throw new AlgebricksException(e);
++        } finally {
++            releaseWriteLatch();
+         }
+     }
+ 
+@@ -791,10 +1268,37 @@
+ 
+     private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+             String indexName, AqlMetadataProvider metadataProvider) throws Exception {
++        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
++        Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++
++        //#. mark PendingDropOp on the existing index
++        MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++        MetadataManager.INSTANCE.addIndex(
++                mdTxnCtx,
++                new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
++                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+         CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+-        runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+-        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+-                indexName);
++        JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
++
++        //#. commit the existing transaction before calling runJob. 
++        //   the caller should begin the transaction before calling this function.
++        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++        try {
++            runJob(hcc, jobSpec);
++        } catch (Exception e) {
++            //need to create the mdTxnCtx to be aborted by caller properly
++            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++            metadataProvider.setMetadataTxnContext(mdTxnCtx);
++            throw e;
++        }
++
++        //#. begin a new transaction
++        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++        metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++        //#. finally, delete the existing index
++        MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+     }
+ 
+     private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+@@ -803,10 +1307,32 @@
+         CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+         Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+         if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-            JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+-            for (JobSpecification spec : jobSpecs)
+-                runJob(hcc, spec);
++            JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
++
++            //#. mark PendingDropOp on the existing dataset 
++            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++            MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
++                    ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
++
++            //#. commit the transaction
++            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++            //#. run the job
++            try {
++                runJob(hcc, jobSpec);
++            } catch (Exception e) {
++                //need to create the mdTxnCtx to be aborted by caller properly
++                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++                metadataProvider.setMetadataTxnContext(mdTxnCtx);
++                throw e;
++            }
++
++            //#. start a new transaction
++            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+         }
++
++        //#. finally, delete the existing dataset. 
+         MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+     }
+ 
+@@ -831,4 +1357,20 @@
+         }
+         return format;
+     }
++
++    private void acquireWriteLatch() {
++        cacheLatch.writeLock().lock();
++    }
++
++    private void releaseWriteLatch() {
++        cacheLatch.writeLock().unlock();
++    }
++
++    private void acquireReadLatch() {
++        cacheLatch.readLock().lock();
++    }
++
++    private void releaseReadLatch() {
++        cacheLatch.readLock().unlock();
++    }
+ }