changes towards recovery
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1098 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/diff_file b/diff_file
new file mode 100644
index 0000000..8efd61f
--- /dev/null
+++ b/diff_file
@@ -0,0 +1,2098 @@
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
+@@ -25,8 +25,11 @@
+ import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+ import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+ import edu.uci.ics.asterix.metadata.entities.Dataverse;
++import edu.uci.ics.asterix.om.base.AInt32;
++import edu.uci.ics.asterix.om.base.AMutableInt32;
+ import edu.uci.ics.asterix.om.base.ARecord;
+ import edu.uci.ics.asterix.om.base.AString;
++import edu.uci.ics.asterix.om.types.BuiltinType;
+ import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+ import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+@@ -40,12 +43,18 @@
+ // Payload field containing serialized Dataverse.
+ public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
+
++ private AMutableInt32 aInt32;
++ protected ISerializerDeserializer<AInt32> aInt32Serde;
++
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
+
++ @SuppressWarnings("unchecked")
+ public DataverseTupleTranslator(boolean getTuple) {
+ super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
++ aInt32 = new AMutableInt32(-1);
++ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ }
+
+ @Override
+@@ -57,7 +66,8 @@
+ DataInput in = new DataInputStream(stream);
+ ARecord dataverseRecord = recordSerDes.deserialize(in);
+ return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
+- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
++ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
++ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
+ }
+
+ @Override
+@@ -88,6 +98,12 @@
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+
++ // write field 3
++ fieldValue.reset();
++ aInt32.setValue(instance.getPendingOp());
++ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
++ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
+@@ -77,9 +77,9 @@
+ protected ISerializerDeserializer<AInt32> aInt32Serde;
+
+ @SuppressWarnings("unchecked")
+- public DatasetTupleTranslator(boolean getTuple) {
++ public DatasetTupleTranslator(boolean getTuple) {
+ super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+- aInt32 = new AMutableInt32(-1);
++ aInt32 = new AMutableInt32(-1);
+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ }
+
+@@ -104,8 +104,10 @@
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
+ DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
+ IDatasetDetails datasetDetails = null;
+- int datasetId = ((AInt32) datasetRecord
++ int datasetId = ((AInt32) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
++ int pendingOp = ((AInt32) datasetRecord
++ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
+ switch (datasetType) {
+ case FEED:
+ case INTERNAL: {
+@@ -197,7 +199,7 @@
+ }
+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
+ }
+- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
++ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
+ }
+
+ @Override
+@@ -248,13 +250,19 @@
+ aString.setValue(Calendar.getInstance().getTime().toString());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+-
++
+ // write field 8
+ fieldValue.reset();
+ aInt32.setValue(dataset.getDatasetId());
+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
+-
++
++ // write field 9
++ fieldValue.reset();
++ aInt32.setValue(dataset.getPendingOp());
++ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
++ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+@@ -290,13 +298,15 @@
+ fieldValue.reset();
+ aString.setValue(name);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
++ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
++ fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(value);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
++ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
++ fieldValue);
+
+ propertyRecordBuilder.write(out, true);
+ }
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
+@@ -96,13 +96,15 @@
+ }
+ Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
+ .getBoolean();
++ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
++ .getIntegerValue();
+ // Check if there is a gram length as well.
+ int gramLength = -1;
+ int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
+ if (gramLenPos >= 0) {
+ gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
+ }
+- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
++ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
+ }
+
+ @Override
+@@ -174,7 +176,12 @@
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+
+- // write optional field 7
++ // write field 7
++ fieldValue.reset();
++ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
++ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
++
++ // write optional field 8
+ if (instance.getGramLength() > 0) {
+ fieldValue.reset();
+ nameValue.reset();
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
+@@ -129,7 +129,7 @@
+
+ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+ private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
+- private final MetadataTransactionContext mdTxnCtx;
++ private MetadataTransactionContext mdTxnCtx;
+ private boolean isWriteTransaction;
+ private Map<String, String[]> stores;
+ private Map<String, String> config;
+@@ -156,8 +156,7 @@
+ return config;
+ }
+
+- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
+- this.mdTxnCtx = mdTxnCtx;
++ public AqlMetadataProvider(Dataverse defaultDataverse) {
+ this.defaultDataverse = defaultDataverse;
+ this.stores = AsterixProperties.INSTANCE.getStores();
+ }
+@@ -181,6 +180,10 @@
+ public void setWriterFactory(IAWriterFactory writerFactory) {
+ this.writerFactory = writerFactory;
+ }
++
++ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
++ this.mdTxnCtx = mdTxnCtx;
++ }
+
+ public MetadataTransactionContext getMetadataTxnContext() {
+ return mdTxnCtx;
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
+@@ -35,15 +35,18 @@
+ private final DatasetType datasetType;
+ private IDatasetDetails datasetDetails;
+ private final int datasetId;
++ // Type of pending operations with respect to atomic DDL operation
++ private final int pendingOp;
+
+ public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
+- DatasetType datasetType, int datasetId) {
++ DatasetType datasetType, int datasetId, int pendingOp) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.itemTypeName = itemTypeName;
+ this.datasetType = datasetType;
+ this.datasetDetails = datasetDetails;
+ this.datasetId = datasetId;
++ this.pendingOp = pendingOp;
+ }
+
+ public String getDataverseName() {
+@@ -73,6 +76,10 @@
+ public int getDatasetId() {
+ return datasetId;
+ }
++
++ public int getPendingOp() {
++ return pendingOp;
++ }
+
+ @Override
+ public Object addToCache(MetadataCache cache) {
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
+@@ -45,9 +45,11 @@
+ private final boolean isPrimaryIndex;
+ // Specific to NGRAM indexes.
+ private final int gramLength;
++ // Type of pending operations with respect to atomic DDL operation
++ private final int pendingOp;
+
+ public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
++ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.indexName = indexName;
+@@ -55,10 +57,11 @@
+ this.keyFieldNames = keyFieldNames;
+ this.gramLength = gramLength;
+ this.isPrimaryIndex = isPrimaryIndex;
++ this.pendingOp = pendingOp;
+ }
+
+ public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+- List<String> keyFieldNames, boolean isPrimaryIndex) {
++ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.indexName = indexName;
+@@ -66,6 +69,7 @@
+ this.keyFieldNames = keyFieldNames;
+ this.gramLength = -1;
+ this.isPrimaryIndex = isPrimaryIndex;
++ this.pendingOp = pendingOp;
+ }
+
+ public String getDataverseName() {
+@@ -95,6 +99,10 @@
+ public boolean isPrimaryIndex() {
+ return isPrimaryIndex;
+ }
++
++ public int getPendingOp() {
++ return pendingOp;
++ }
+
+ public boolean isSecondaryIndex() {
+ return !isPrimaryIndex();
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
+@@ -27,10 +27,12 @@
+ // Enforced to be unique within an Asterix cluster..
+ private final String dataverseName;
+ private final String dataFormat;
++ private final int pendingOp;
+
+- public Dataverse(String dataverseName, String format) {
++ public Dataverse(String dataverseName, String format, int pendingOp) {
+ this.dataverseName = dataverseName;
+ this.dataFormat = format;
++ this.pendingOp = pendingOp;
+ }
+
+ public String getDataverseName() {
+@@ -40,6 +42,10 @@
+ public String getDataFormat() {
+ return dataFormat;
+ }
++
++ public int getPendingOp() {
++ return pendingOp;
++ }
+
+ @Override
+ public Object addToCache(MetadataCache cache) {
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
+@@ -25,6 +25,7 @@
+ import edu.uci.ics.asterix.common.exceptions.AsterixException;
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+ import edu.uci.ics.asterix.metadata.api.IMetadataNode;
+ import edu.uci.ics.asterix.metadata.api.IValueExtractor;
+@@ -160,7 +161,7 @@
+ // Add the primary index for the dataset.
+ InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
+ Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
+- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
++ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
+ addIndex(jobId, primaryIndex);
+ ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
+ dataset.getDatasetName());
+@@ -260,7 +261,7 @@
+ IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
++ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+ // TODO: fix exceptions once new BTree exception model is in hyracks.
+ indexAccessor.insert(tuple);
+ //TODO: extract the key from the tuple and get the PKHashValue from the key.
+@@ -536,7 +537,7 @@
+ // The transaction with txnId will have an S lock on the
+ // resource. Note that lock converters have a higher priority than
+ // regular waiters in the LockManager.
+- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
++ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+ indexAccessor.delete(tuple);
+ //TODO: extract the key from the tuple and get the PKHashValue from the key.
+ //check how to get the oldValue.
+@@ -803,7 +804,9 @@
+ private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
+ IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
+ TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
++ //#. currently lock is not needed to access any metadata
++ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
++ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+ IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
+ long resourceID = index.getResourceID();
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
+@@ -20,6 +20,11 @@
+ import edu.uci.ics.asterix.metadata.MetadataCache;
+
+ public interface IMetadataEntity extends Serializable {
++
++ public static final int PENDING_NO_OP = 0;
++ public static final int PENDING_ADD_OP = 1;
++ public static final int PENDING_DROP_OP = 2;
++
+ Object addToCache(MetadataCache cache);
+
+ Object dropFromCache(MetadataCache cache);
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
+@@ -17,6 +17,8 @@
+
+ import java.rmi.RemoteException;
+ import java.util.List;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+@@ -79,11 +81,10 @@
+ public class MetadataManager implements IMetadataManager {
+ // Set in init().
+ public static MetadataManager INSTANCE;
+-
+ private final MetadataCache cache = new MetadataCache();
+ private IAsterixStateProxy proxy;
+ private IMetadataNode metadataNode;
+-
++
+ public MetadataManager(IAsterixStateProxy proxy) {
+ if (proxy == null) {
+ throw new Error("Null proxy given to MetadataManager.");
+@@ -206,11 +207,14 @@
+
+ @Override
+ public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
++ // add dataset into metadataNode
+ try {
+ metadataNode.addDataset(ctx.getJobId(), dataset);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
++
++ // reflect the dataset into the cache
+ ctx.addDataset(dataset);
+ }
+
+@@ -585,4 +589,5 @@
+ }
+ return adapter;
+ }
++
+ }
+\ No newline at end of file
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
+@@ -19,6 +19,7 @@
+
+ import edu.uci.ics.asterix.common.functions.FunctionSignature;
+ import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+ import edu.uci.ics.asterix.metadata.entities.Datatype;
+@@ -104,19 +105,19 @@
+ }
+
+ public void dropDataset(String dataverseName, String datasetName) {
+- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
++ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
+ droppedCache.addDatasetIfNotExists(dataset);
+ logAndApply(new MetadataLogicalOperation(dataset, false));
+ }
+
+ public void dropIndex(String dataverseName, String datasetName, String indexName) {
+- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
++ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
+ droppedCache.addIndexIfNotExists(index);
+ logAndApply(new MetadataLogicalOperation(index, false));
+ }
+
+ public void dropDataverse(String dataverseName) {
+- Dataverse dataverse = new Dataverse(dataverseName, null);
++ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
+ droppedCache.addDataverseIfNotExists(dataverse);
+ logAndApply(new MetadataLogicalOperation(dataverse, false));
+ }
+@@ -162,7 +163,7 @@
+ }
+ return droppedCache.getDataset(dataverseName, datasetName) != null;
+ }
+-
++
+ public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
+ if (droppedCache.getDataverse(dataverseName) != null) {
+ return true;
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
+@@ -80,10 +80,11 @@
+ public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
+ public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
+ public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
++ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
+
+ private static final ARecordType createDataverseRecordType() {
+- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
+- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
++ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
++ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
+ }
+
+ // Helper constants for accessing fields in an ARecord of anonymous type
+@@ -158,10 +159,11 @@
+ public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
+ public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
+ public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
++ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
+
+ private static final ARecordType createDatasetRecordType() {
+ String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
+- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
++ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
+
+ List<IAType> internalRecordUnionList = new ArrayList<IAType>();
+ internalRecordUnionList.add(BuiltinType.ANULL);
+@@ -179,7 +181,8 @@
+ AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
+
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
++ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
++ BuiltinType.AINT32 };
+ return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
+ }
+
+@@ -264,13 +267,14 @@
+ public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
+ public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
+ public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
++ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
+
+ private static final ARecordType createIndexRecordType() {
+ AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
+ String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
+- "IsPrimary", "Timestamp" };
++ "IsPrimary", "Timestamp", "PendingOp" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
++ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
+ return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
+ };
+
+Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+===================================================================
+--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
++++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
+@@ -31,6 +31,7 @@
+ import edu.uci.ics.asterix.metadata.IDatasetDetails;
+ import edu.uci.ics.asterix.metadata.MetadataManager;
+ import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+ import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+@@ -226,7 +227,7 @@
+ public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
+ String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
+ String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
+- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
++ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
+ }
+
+ public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
+@@ -236,7 +237,7 @@
+ primaryIndexes[i].getNodeGroupName());
+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
+ primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
+- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
++ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
+ }
+ }
+
+@@ -267,7 +268,7 @@
+ for (int i = 0; i < secondaryIndexes.length; i++) {
+ MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
+ secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
+- secondaryIndexes[i].getPartitioningExpr(), false));
++ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
+ }
+ }
+
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+===================================================================
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
+@@ -95,11 +95,11 @@
+ File testFile = tcCtx.getTestFile(cUnit);
+
+ /*************** to avoid run failure cases ****************
+- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
++ if (!testFile.getAbsolutePath().contains("index-selection/")) {
+ continue;
+ }
+ ************************************************************/
+-
++
+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
+ File actualFile = new File(PATH_ACTUAL + File.separator
+ + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
+Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+===================================================================
+--- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
++++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
+@@ -90,7 +90,7 @@
+
+ private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+
+- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
++ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
+ ACIDException, AsterixException {
+
+@@ -111,67 +111,10 @@
+ throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+ }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+- return new JobSpecification[0];
++ return new JobSpecification();
+ }
+-
+- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
+- dataset.getDatasetName());
+- int numSecondaryIndexes = 0;
+- for (Index index : datasetIndexes) {
+- if (index.isSecondaryIndex()) {
+- numSecondaryIndexes++;
+- }
+- }
+- JobSpecification[] specs;
+- if (numSecondaryIndexes > 0) {
+- specs = new JobSpecification[numSecondaryIndexes + 1];
+- int i = 0;
+- // First, drop secondary indexes.
+- for (Index index : datasetIndexes) {
+- if (index.isSecondaryIndex()) {
+- specs[i] = new JobSpecification();
+- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
+- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
+- datasetName, index.getIndexName());
+- IIndexDataflowHelperFactory dfhFactory;
+- switch (index.getIndexType()) {
+- case BTREE:
+- dfhFactory = new LSMBTreeDataflowHelperFactory(
+- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
+- break;
+- case RTREE:
+- dfhFactory = new LSMRTreeDataflowHelperFactory(
+- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
+- new IBinaryComparatorFactory[] { null },
+- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
+- break;
+- case NGRAM_INVIX:
+- case WORD_INVIX:
+- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
+- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
+- break;
+- default:
+- throw new AsterixException("Unknown index type provided.");
+- }
+- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
+- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
+- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
+- idxSplitsAndConstraint.second);
+- i++;
+- }
+- }
+- } else {
+- specs = new JobSpecification[1];
+- }
++
+ JobSpecification specPrimary = new JobSpecification();
+- specs[specs.length - 1] = specPrimary;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
+@@ -187,7 +130,7 @@
+
+ specPrimary.addRoot(primaryBtreeDrop);
+
+- return specs;
++ return specPrimary;
+ }
+
+ public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+===================================================================
+--- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
++++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
+@@ -21,6 +21,8 @@
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+ import org.json.JSONException;
+
+@@ -68,6 +70,7 @@
+ import edu.uci.ics.asterix.metadata.MetadataException;
+ import edu.uci.ics.asterix.metadata.MetadataManager;
+ import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
++import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+ import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+ import edu.uci.ics.asterix.metadata.entities.Dataset;
+ import edu.uci.ics.asterix.metadata.entities.Datatype;
+@@ -112,6 +115,7 @@
+ private final PrintWriter out;
+ private final SessionConfig sessionConfig;
+ private final DisplayFormat pdf;
++ private final ReadWriteLock cacheLatch;
+ private Dataverse activeDefaultDataverse;
+ private List<FunctionDecl> declaredFunctions;
+
+@@ -121,6 +125,7 @@
+ this.out = out;
+ this.sessionConfig = pc;
+ this.pdf = pdf;
++ this.cacheLatch = new ReentrantReadWriteLock(true);
+ declaredFunctions = getDeclaredFunctions(aqlStatements);
+ }
+
+@@ -143,8 +148,7 @@
+
+ for (Statement stmt : aqlStatements) {
+ validateOperation(activeDefaultDataverse, stmt);
+- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
++ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+ metadataProvider.setWriterFactory(writerFactory);
+ metadataProvider.setOutputFile(outputFile);
+ metadataProvider.setConfig(config);
+@@ -253,15 +257,9 @@
+ }
+
+ }
+- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ }
+- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
+- for (JobSpecification jobspec : jobsToExecute) {
+- runJob(hcc, jobspec);
+- }
+ }
+ return executionResult;
+ }
+@@ -289,398 +287,802 @@
+
+ private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
+- DataverseDecl dvd = (DataverseDecl) stmt;
+- String dvName = dvd.getDataverseName().getValue();
+- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+- if (dv == null) {
+- throw new MetadataException("Unknown dataverse " + dvName);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ DataverseDecl dvd = (DataverseDecl) stmt;
++ String dvName = dvd.getDataverseName().getValue();
++ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
++ if (dv == null) {
++ throw new MetadataException("Unknown dataverse " + dvName);
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ return dv;
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new MetadataException(e);
++ } finally {
++ releaseReadLatch();
+ }
+- return dv;
+ }
+
+ private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
+- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+- String dvName = stmtCreateDataverse.getDataverseName().getValue();
+- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
++ String dvName = stmtCreateDataverse.getDataverseName().getValue();
++ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
++ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
++ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
++ }
++ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
++ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+- stmtCreateDataverse.getFormat()));
+ }
+
+ private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
+- DatasetDecl dd = (DatasetDecl) stmt;
+- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
+- }
+- String datasetName = dd.getName().getValue();
+- DatasetType dsType = dd.getDatasetType();
+- String itemTypeName = dd.getItemTypeName().getValue();
+
+- IDatasetDetails datasetDetails = null;
+- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+- datasetName);
+- if (ds != null) {
+- if (dd.getIfNotExists()) {
+- return;
+- } else {
+- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ DatasetDecl dd = (DatasetDecl) stmt;
++ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
++ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
+ }
+- }
+- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+- itemTypeName);
+- if (dt == null) {
+- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+- }
+- switch (dd.getDatasetType()) {
+- case INTERNAL: {
+- IAType itemType = dt.getDatatype();
+- if (itemType.getTypeTag() != ATypeTag.RECORD) {
+- throw new AlgebricksException("Can only partition ARecord's.");
++ String datasetName = dd.getName().getValue();
++ DatasetType dsType = dd.getDatasetType();
++ String itemTypeName = dd.getItemTypeName().getValue();
++
++ IDatasetDetails datasetDetails = null;
++ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
++ datasetName);
++ if (ds != null) {
++ if (dd.getIfNotExists()) {
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ return;
++ } else {
++ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+ }
+- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+- .getPartitioningExprs();
+- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
+- break;
+ }
+- case EXTERNAL: {
+- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+- datasetDetails = new ExternalDatasetDetails(adapter, properties);
+- break;
++ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
++ itemTypeName);
++ if (dt == null) {
++ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+ }
+- case FEED: {
+- IAType itemType = dt.getDatatype();
+- if (itemType.getTypeTag() != ATypeTag.RECORD) {
+- throw new AlgebricksException("Can only partition ARecord's.");
++ switch (dd.getDatasetType()) {
++ case INTERNAL: {
++ IAType itemType = dt.getDatatype();
++ if (itemType.getTypeTag() != ATypeTag.RECORD) {
++ throw new AlgebricksException("Can only partition ARecord's.");
++ }
++ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
++ .getPartitioningExprs();
++ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
++ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
++ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
++ ngName);
++ break;
+ }
+- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
+- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
+- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
+- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+- break;
++ case EXTERNAL: {
++ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
++ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
++ datasetDetails = new ExternalDatasetDetails(adapter, properties);
++ break;
++ }
++ case FEED: {
++ IAType itemType = dt.getDatatype();
++ if (itemType.getTypeTag() != ATypeTag.RECORD) {
++ throw new AlgebricksException("Can only partition ARecord's.");
++ }
++ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
++ .getPartitioningExprs();
++ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
++ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
++ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
++ .getConfiguration();
++ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
++ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
++ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
++ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
++ break;
++ }
+ }
++
++ //#. add a new dataset with PendingAddOp
++ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
++ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
++ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
++
++ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
++ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
++ dataverseName);
++ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
++ metadataProvider);
++
++ //#. make metadataTxn commit before calling runJob.
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ //#. runJob
++ runJob(hcc, jobSpec);
++
++ //#. begin new metadataTxn
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ }
++
++ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
++ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
++ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
++ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
++ IMetadataEntity.PENDING_NO_OP));
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
+- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+- dataverseName);
+- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+- }
+ }
+
+ private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
+- }
+- String datasetName = stmtCreateIndex.getDatasetName().getValue();
+- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+- datasetName);
+- if (ds == null) {
+- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+- + dataverseName);
+- }
+- String indexName = stmtCreateIndex.getIndexName().getValue();
+- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+- datasetName, indexName);
+- if (idx != null) {
+- if (!stmtCreateIndex.getIfNotExists()) {
+- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+- } else {
+- stmtCreateIndex.setNeedToCreate(false);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
++ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
+ }
+- } else {
++ String datasetName = stmtCreateIndex.getDatasetName().getValue();
++
++ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
++ datasetName);
++ if (ds == null) {
++ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
++ + dataverseName);
++ }
++
++ String indexName = stmtCreateIndex.getIndexName().getValue();
++ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
++ datasetName, indexName);
++
++ if (idx != null) {
++ if (!stmtCreateIndex.getIfNotExists()) {
++ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
++ } else {
++ stmtCreateIndex.setNeedToCreate(false);
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ return;
++ }
++ }
++
++ //#. add a new index with PendingAddOp
+ Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
++ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
++ IMetadataEntity.PENDING_ADD_OP);
+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
+
++ //#. create the index artifact in NC.
+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+ index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+- JobSpecification loadIndexJobSpec = IndexOperations
+- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+- runJob(hcc, loadIndexJobSpec);
++ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
++ if (spec == null) {
++ throw new AsterixException("Failed to create job spec for creating index '"
++ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ runJob(hcc, spec);
++
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++ //#. load data into the index in NC.
++ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
++ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
++ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ runJob(hcc, spec);
++
++ //#. begin new metadataTxn
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
++ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
++ indexName);
++ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
++ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
++ IMetadataEntity.PENDING_NO_OP);
++ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
+ MetadataException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- TypeDecl stmtCreateType = (TypeDecl) stmt;
+- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
+- }
+- String typeName = stmtCreateType.getIdent().getValue();
+- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+- if (dv == null) {
+- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+- }
+- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+- if (dt != null) {
+- if (!stmtCreateType.getIfNotExists())
+- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+- } else {
+- if (builtinTypeMap.get(typeName) != null) {
+- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ TypeDecl stmtCreateType = (TypeDecl) stmt;
++ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
++ }
++ String typeName = stmtCreateType.getIdent().getValue();
++ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
++ if (dv == null) {
++ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
++ }
++ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
++ if (dt != null) {
++ if (!stmtCreateType.getIfNotExists()) {
++ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
++ }
+ } else {
+- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+- dataverseName);
+- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+- IAType type = typeMap.get(typeSignature);
+- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
++ if (builtinTypeMap.get(typeName) != null) {
++ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
++ } else {
++ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
++ dataverseName);
++ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
++ IAType type = typeMap.get(typeSignature);
++ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
++ }
+ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+- String dvName = stmtDelete.getDataverseName().getValue();
+
+- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+- if (dv == null) {
+- if (!stmtDelete.getIfExists()) {
+- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
++ String dvName = stmtDelete.getDataverseName().getValue();
++
++ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
++ if (dv == null) {
++ if (!stmtDelete.getIfExists()) {
++ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ return;
+ }
+- } else {
++
++ //#. prepare jobs which will drop corresponding datasets with indexes.
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+ for (int j = 0; j < datasets.size(); j++) {
+ String datasetName = datasets.get(j).getDatasetName();
+ DatasetType dsType = datasets.get(j).getDatasetType();
+ if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
++
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
+ for (int k = 0; k < indexes.size(); k++) {
+ if (indexes.get(k).isSecondaryIndex()) {
+- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
+- metadataProvider);
++ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
++ indexes.get(k).getIndexName());
++ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ }
+ }
++
++ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
++ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+ }
+- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
+ }
+
++ //#. mark PendingDropOp on the dataverse record by
++ // first, deleting the dataverse record from the DATAVERSE_DATASET
++ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
++ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
++ IMetadataEntity.PENDING_DROP_OP));
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ for (JobSpecification jobSpec : jobsToExecute) {
++ runJob(hcc, jobSpec);
++ }
++
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++ //#. finally, delete the dataverse.
++ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+ if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
+ activeDefaultDataverse = null;
+ }
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- DropStatement stmtDelete = (DropStatement) stmt;
+- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
+- }
+- String datasetName = stmtDelete.getDatasetName().getValue();
+- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+- if (ds == null) {
+- if (!stmtDelete.getIfExists())
+- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+- + dataverseName + ".");
+- } else {
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ DropStatement stmtDelete = (DropStatement) stmt;
++ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
++ }
++ String datasetName = stmtDelete.getDatasetName().getValue();
++
++ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
++ if (ds == null) {
++ if (!stmtDelete.getIfExists()) {
++ throw new AlgebricksException("There is no dataset with this name " + datasetName
++ + " in dataverse " + dataverseName + ".");
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ return;
++ }
++
+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
++
++ //#. prepare jobs to drop the datatset and the indexes in NC
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+ for (int j = 0; j < indexes.size(); j++) {
+- if (indexes.get(j).isPrimaryIndex()) {
+- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
+- metadataProvider);
++ if (indexes.get(j).isSecondaryIndex()) {
++ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
++ indexes.get(j).getIndexName());
++ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ }
+ }
++ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
++ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
++
++ //#. mark the existing dataset as PendingDropOp
++ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++ MetadataManager.INSTANCE.addDataset(
++ mdTxnCtx,
++ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
++ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ //#. run the jobs
++ for (JobSpecification jobSpec : jobsToExecute) {
++ runJob(hcc, jobSpec);
++ }
++
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ }
+- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
++
++ //#. finally, delete the dataset.
++ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+- String datasetName = stmtIndexDrop.getDatasetName().getValue();
+- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
++ String datasetName = stmtIndexDrop.getDatasetName().getValue();
++ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
++ }
++
++ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
++ if (ds == null) {
++ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
++ + dataverseName);
++ }
++
++ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
++ String indexName = stmtIndexDrop.getIndexName().getValue();
++ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++ if (index == null) {
++ if (!stmtIndexDrop.getIfExists()) {
++ throw new AlgebricksException("There is no index with this name " + indexName + ".");
++ }
++ } else {
++ //#. prepare a job to drop the index in NC.
++ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
++ indexName);
++ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
++
++ //#. mark PendingDropOp on the existing index
++ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++ MetadataManager.INSTANCE.addIndex(
++ mdTxnCtx,
++ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
++ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
++
++ //#. commit the existing transaction before calling runJob.
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ for (JobSpecification jobSpec : jobsToExecute) {
++ runJob(hcc, jobSpec);
++ }
++
++ //#. begin a new transaction
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++ //#. finally, delete the existing index
++ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++ }
++ } else {
++ throw new AlgebricksException(datasetName
++ + " is an external dataset. Indexes are not maintained for external datasets.");
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++
++ } finally {
++ releaseWriteLatch();
+ }
+- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+- if (ds == null)
+- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+- + dataverseName);
+- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+- String indexName = stmtIndexDrop.getIndexName().getValue();
+- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+- if (idx == null) {
+- if (!stmtIndexDrop.getIfExists())
+- throw new AlgebricksException("There is no index with this name " + indexName + ".");
+- } else
+- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
+- } else {
+- throw new AlgebricksException(datasetName
+- + " is an external dataset. Indexes are not maintained for external datasets.");
+- }
+ }
+
+ private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+ ACIDException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+- if (dataverseName == null) {
+- throw new AlgebricksException(" dataverse not specified ");
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
++ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
++ if (dataverseName == null) {
++ throw new AlgebricksException(" dataverse not specified ");
++ }
++ String typeName = stmtTypeDrop.getTypeName().getValue();
++ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
++ if (dt == null) {
++ if (!stmtTypeDrop.getIfExists())
++ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
++ } else {
++ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+- String typeName = stmtTypeDrop.getTypeName().getValue();
+- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+- if (dt == null) {
+- if (!stmtTypeDrop.getIfExists())
+- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+- } else {
+- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+- }
+ }
+
+ private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+- if (ng == null) {
+- if (!stmtDelete.getIfExists())
+- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+- } else {
+- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
++ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
++ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
++ if (ng == null) {
++ if (!stmtDelete.getIfExists())
++ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
++ } else {
++ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
++ }
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+ ACIDException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+- if (dataverse == null) {
+- throw new AlgebricksException(" dataverse not specified ");
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
++ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
++ if (dataverse == null) {
++ throw new AlgebricksException(" dataverse not specified ");
++ }
++ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
++ if (dv == null) {
++ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
++ }
++ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
++ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
++ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
++ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+- if (dv == null) {
+- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+- }
+- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+ }
+
+ private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
+ AlgebricksException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+- if (function == null) {
+- if (!stmtDropFunction.getIfExists())
+- throw new AlgebricksException("Unknonw function " + signature);
+- } else {
+- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
++ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
++ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
++ if (function == null) {
++ if (!stmtDropFunction.getIfExists())
++ throw new AlgebricksException("Unknonw function " + signature);
++ } else {
++ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
++ }
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+ private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
+- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+
+- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+- jobsToExecute.add(job.getJobSpec());
+- // Also load the dataset's secondary indexes.
+- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+- .getDatasetName().getValue());
+- for (Index index : datasetIndexes) {
+- if (!index.isSecondaryIndex()) {
+- continue;
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
++ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
++ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
++ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
++ loadStmt.dataIsAlreadySorted());
++
++ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
++ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
++ jobsToExecute.add(job.getJobSpec());
++ // Also load the dataset's secondary indexes.
++ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
++ .getDatasetName().getValue());
++ for (Index index : datasetIndexes) {
++ if (!index.isSecondaryIndex()) {
++ continue;
++ }
++ // Create CompiledCreateIndexStatement from metadata entity 'index'.
++ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
++ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
++ index.getIndexType());
++ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+ }
+- // Create CompiledCreateIndexStatement from metadata entity 'index'.
+- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ for (JobSpecification jobspec : jobsToExecute) {
++ runJob(hcc, jobspec);
++ }
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+ }
+
+ private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- metadataProvider.setWriteTransaction(true);
+- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+
+- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+- if (compiled.first != null) {
+- jobsToExecute.add(compiled.first);
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ metadataProvider.setWriteTransaction(true);
++ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
++ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
++ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
++ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
++
++ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++ clfrqs);
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++ if (compiled.first != null) {
++ runJob(hcc, compiled.first);
++ }
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+ }
+
+ private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- metadataProvider.setWriteTransaction(true);
+- InsertStatement stmtInsert = (InsertStatement) stmt;
+- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+- if (compiled.first != null) {
+- jobsToExecute.add(compiled.first);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ metadataProvider.setWriteTransaction(true);
++ InsertStatement stmtInsert = (InsertStatement) stmt;
++ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
++ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
++ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
++ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++ clfrqs);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ if (compiled.first != null) {
++ runJob(hcc, compiled.first);
++ }
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+ }
+
+ private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- metadataProvider.setWriteTransaction(true);
+- DeleteStatement stmtDelete = (DeleteStatement) stmt;
+- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+- stmtDelete.getVarCounter(), metadataProvider);
+- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+- if (compiled.first != null) {
+- jobsToExecute.add(compiled.first);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ metadataProvider.setWriteTransaction(true);
++ DeleteStatement stmtDelete = (DeleteStatement) stmt;
++ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
++ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
++ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
++ stmtDelete.getVarCounter(), metadataProvider);
++ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
++ clfrqs);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ if (compiled.first != null) {
++ runJob(hcc, compiled.first);
++ }
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+ }
+
+@@ -704,46 +1106,109 @@
+
+ private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+
+- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
+- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
+
+- Dataset dataset;
+- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+- .getDatasetName().getValue());
+- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
++ try {
++ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
++ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
++
++ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
++ .getValue(), bfs.getQuery(), bfs.getVarCounter());
++
++ Dataset dataset;
++ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
++ .getDatasetName().getValue());
++ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
++ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
++ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
++ + " is not a feed dataset");
++ }
++ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
++ cbfs.setQuery(bfs.getQuery());
++ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ if (compiled.first != null) {
++ runJob(hcc, compiled.first);
++ }
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+- cbfs.setQuery(bfs.getQuery());
+- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+- if (compiled.first != null) {
+- jobsToExecute.add(compiled.first);
+- }
+ }
+
+ private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
+- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
++ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
++ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
++ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
++ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
++ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
++
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ runJob(hcc, jobSpec);
++
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
++ }
+ }
+
+ private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+ List<JobSpecification> jobsToExecute) throws Exception {
+- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+- if (compiled.first != null) {
+- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+- jobsToExecute.add(compiled.first);
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ boolean bActiveTxn = true;
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireReadLatch();
++
++ try {
++ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
++
++ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ bActiveTxn = false;
++
++ if (compiled.first != null) {
++ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
++ runJob(hcc, compiled.first);
++ }
++
++ return queryResult;
++ } catch (Exception e) {
++ if (bActiveTxn) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ }
++ throw new AlgebricksException(e);
++ } finally {
++ releaseReadLatch();
+ }
+- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+ }
+
+ private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
+@@ -768,20 +1233,32 @@
+ private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+ ACIDException {
+- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+- if (ng != null) {
+- if (!stmtCreateNodegroup.getIfNotExists())
+- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+- } else {
+- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+- for (Identifier id : ncIdentifiers) {
+- ncNames.add(id.getValue());
++
++ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ acquireWriteLatch();
++
++ try {
++ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
++ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
++ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
++ if (ng != null) {
++ if (!stmtCreateNodegroup.getIfNotExists())
++ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
++ } else {
++ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
++ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
++ for (Identifier id : ncIdentifiers) {
++ ncNames.add(id.getValue());
++ }
++ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+ }
+- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ } catch (Exception e) {
++ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ throw new AlgebricksException(e);
++ } finally {
++ releaseWriteLatch();
+ }
+ }
+
+@@ -791,10 +1268,37 @@
+
+ private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+ String indexName, AqlMetadataProvider metadataProvider) throws Exception {
++ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
++ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++
++ //#. mark PendingDropOp on the existing index
++ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++ MetadataManager.INSTANCE.addIndex(
++ mdTxnCtx,
++ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
++ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+- indexName);
++ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
++
++ //#. commit the existing transaction before calling runJob.
++ // the caller should begin the transaction before calling this function.
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++ try {
++ runJob(hcc, jobSpec);
++ } catch (Exception e) {
++ //need to create the mdTxnCtx to be aborted by caller properly
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ throw e;
++ }
++
++ //#. begin a new transaction
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++
++ //#. finally, delete the existing index
++ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ }
+
+ private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+@@ -803,10 +1307,32 @@
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+- for (JobSpecification spec : jobSpecs)
+- runJob(hcc, spec);
++ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
++
++ //#. mark PendingDropOp on the existing dataset
++ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
++ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
++ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
++
++ //#. commit the transaction
++ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++
++ //#. run the job
++ try {
++ runJob(hcc, jobSpec);
++ } catch (Exception e) {
++ //need to create the mdTxnCtx to be aborted by caller properly
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ throw e;
++ }
++
++ //#. start a new transaction
++ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
++ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ }
++
++ //#. finally, delete the existing dataset.
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+ }
+
+@@ -831,4 +1357,20 @@
+ }
+ return format;
+ }
++
++ private void acquireWriteLatch() {
++ cacheLatch.writeLock().lock();
++ }
++
++ private void releaseWriteLatch() {
++ cacheLatch.writeLock().unlock();
++ }
++
++ private void acquireReadLatch() {
++ cacheLatch.readLock().lock();
++ }
++
++ private void releaseReadLatch() {
++ cacheLatch.readLock().unlock();
++ }
+ }