blob: 8efd61fa2bdbf4ad062ffcfe172003b998811627 [file] [log] [blame]
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
@@ -25,8 +25,11 @@
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
import edu.uci.ics.asterix.om.base.ARecord;
import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,12 +43,18 @@
// Payload field containing serialized Dataverse.
public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
+ private AMutableInt32 aInt32;
+ protected ISerializerDeserializer<AInt32> aInt32Serde;
+
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
+ @SuppressWarnings("unchecked")
public DataverseTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
+ aInt32 = new AMutableInt32(-1);
+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@Override
@@ -57,7 +66,8 @@
DataInput in = new DataInputStream(stream);
ARecord dataverseRecord = recordSerDes.deserialize(in);
return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
}
@Override
@@ -88,6 +98,12 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ // write field 3
+ fieldValue.reset();
+ aInt32.setValue(instance.getPendingOp());
+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
@@ -77,9 +77,9 @@
protected ISerializerDeserializer<AInt32> aInt32Serde;
@SuppressWarnings("unchecked")
- public DatasetTupleTranslator(boolean getTuple) {
+ public DatasetTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
- aInt32 = new AMutableInt32(-1);
+ aInt32 = new AMutableInt32(-1);
aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@@ -104,8 +104,10 @@
.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
IDatasetDetails datasetDetails = null;
- int datasetId = ((AInt32) datasetRecord
+ int datasetId = ((AInt32) datasetRecord
.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
+ int pendingOp = ((AInt32) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
switch (datasetType) {
case FEED:
case INTERNAL: {
@@ -197,7 +199,7 @@
}
datasetDetails = new ExternalDatasetDetails(adapter, properties);
}
- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
}
@Override
@@ -248,13 +250,19 @@
aString.setValue(Calendar.getInstance().getTime().toString());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
+
// write field 8
fieldValue.reset();
aInt32.setValue(dataset.getDatasetId());
aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
-
+
+ // write field 9
+ fieldValue.reset();
+ aInt32.setValue(dataset.getPendingOp());
+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
@@ -290,13 +298,15 @@
fieldValue.reset();
aString.setValue(name);
stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
+ fieldValue);
// write field 1
fieldValue.reset();
aString.setValue(value);
stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
+ fieldValue);
propertyRecordBuilder.write(out, true);
}
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
@@ -96,13 +96,15 @@
}
Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
.getBoolean();
+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
+ .getIntegerValue();
// Check if there is a gram length as well.
int gramLength = -1;
int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
if (gramLenPos >= 0) {
gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
}
- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
}
@Override
@@ -174,7 +176,12 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
- // write optional field 7
+ // write field 7
+ fieldValue.reset();
+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
+ // write optional field 8
if (instance.getGramLength() > 0) {
fieldValue.reset();
nameValue.reset();
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
@@ -129,7 +129,7 @@
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
- private final MetadataTransactionContext mdTxnCtx;
+ private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
private Map<String, String[]> stores;
private Map<String, String> config;
@@ -156,8 +156,7 @@
return config;
}
- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
- this.mdTxnCtx = mdTxnCtx;
+ public AqlMetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
this.stores = AsterixProperties.INSTANCE.getStores();
}
@@ -181,6 +180,10 @@
public void setWriterFactory(IAWriterFactory writerFactory) {
this.writerFactory = writerFactory;
}
+
+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
+ this.mdTxnCtx = mdTxnCtx;
+ }
public MetadataTransactionContext getMetadataTxnContext() {
return mdTxnCtx;
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
@@ -35,15 +35,18 @@
private final DatasetType datasetType;
private IDatasetDetails datasetDetails;
private final int datasetId;
+ // Type of pending operations with respect to atomic DDL operation
+ private final int pendingOp;
public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
- DatasetType datasetType, int datasetId) {
+ DatasetType datasetType, int datasetId, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.itemTypeName = itemTypeName;
this.datasetType = datasetType;
this.datasetDetails = datasetDetails;
this.datasetId = datasetId;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -73,6 +76,10 @@
public int getDatasetId() {
return datasetId;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
@Override
public Object addToCache(MetadataCache cache) {
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
@@ -45,9 +45,11 @@
private final boolean isPrimaryIndex;
// Specific to NGRAM indexes.
private final int gramLength;
+ // Type of pending operations with respect to atomic DDL operation
+ private final int pendingOp;
public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.indexName = indexName;
@@ -55,10 +57,11 @@
this.keyFieldNames = keyFieldNames;
this.gramLength = gramLength;
this.isPrimaryIndex = isPrimaryIndex;
+ this.pendingOp = pendingOp;
}
public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
- List<String> keyFieldNames, boolean isPrimaryIndex) {
+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.indexName = indexName;
@@ -66,6 +69,7 @@
this.keyFieldNames = keyFieldNames;
this.gramLength = -1;
this.isPrimaryIndex = isPrimaryIndex;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -95,6 +99,10 @@
public boolean isPrimaryIndex() {
return isPrimaryIndex;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
public boolean isSecondaryIndex() {
return !isPrimaryIndex();
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
@@ -27,10 +27,12 @@
// Enforced to be unique within an Asterix cluster..
private final String dataverseName;
private final String dataFormat;
+ private final int pendingOp;
- public Dataverse(String dataverseName, String format) {
+ public Dataverse(String dataverseName, String format, int pendingOp) {
this.dataverseName = dataverseName;
this.dataFormat = format;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -40,6 +42,10 @@
public String getDataFormat() {
return dataFormat;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
@Override
public Object addToCache(MetadataCache cache) {
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
import edu.uci.ics.asterix.metadata.api.IValueExtractor;
@@ -160,7 +161,7 @@
// Add the primary index for the dataset.
InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
addIndex(jobId, primaryIndex);
ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
dataset.getDatasetName());
@@ -260,7 +261,7 @@
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
//TODO: extract the key from the tuple and get the PKHashValue from the key.
@@ -536,7 +537,7 @@
// The transaction with txnId will have an S lock on the
// resource. Note that lock converters have a higher priority than
// regular waiters in the LockManager.
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
indexAccessor.delete(tuple);
//TODO: extract the key from the tuple and get the PKHashValue from the key.
//check how to get the oldValue.
@@ -803,7 +804,9 @@
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+ //#. currently lock is not needed to access any metadata
+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
long resourceID = index.getResourceID();
IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
@@ -20,6 +20,11 @@
import edu.uci.ics.asterix.metadata.MetadataCache;
public interface IMetadataEntity extends Serializable {
+
+ public static final int PENDING_NO_OP = 0;
+ public static final int PENDING_ADD_OP = 1;
+ public static final int PENDING_DROP_OP = 2;
+
Object addToCache(MetadataCache cache);
Object dropFromCache(MetadataCache cache);
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
@@ -17,6 +17,8 @@
import java.rmi.RemoteException;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
@@ -79,11 +81,10 @@
public class MetadataManager implements IMetadataManager {
// Set in init().
public static MetadataManager INSTANCE;
-
private final MetadataCache cache = new MetadataCache();
private IAsterixStateProxy proxy;
private IMetadataNode metadataNode;
-
+
public MetadataManager(IAsterixStateProxy proxy) {
if (proxy == null) {
throw new Error("Null proxy given to MetadataManager.");
@@ -206,11 +207,14 @@
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
+ // add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
+
+ // reflect the dataset into the cache
ctx.addDataset(dataset);
}
@@ -585,4 +589,5 @@
}
return adapter;
}
+
}
\ No newline at end of file
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
@@ -19,6 +19,7 @@
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -104,19 +105,19 @@
}
public void dropDataset(String dataverseName, String datasetName) {
- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
droppedCache.addDatasetIfNotExists(dataset);
logAndApply(new MetadataLogicalOperation(dataset, false));
}
public void dropIndex(String dataverseName, String datasetName, String indexName) {
- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
droppedCache.addIndexIfNotExists(index);
logAndApply(new MetadataLogicalOperation(index, false));
}
public void dropDataverse(String dataverseName) {
- Dataverse dataverse = new Dataverse(dataverseName, null);
+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
droppedCache.addDataverseIfNotExists(dataverse);
logAndApply(new MetadataLogicalOperation(dataverse, false));
}
@@ -162,7 +163,7 @@
}
return droppedCache.getDataset(dataverseName, datasetName) != null;
}
-
+
public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
if (droppedCache.getDataverse(dataverseName) != null) {
return true;
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
@@ -80,10 +80,11 @@
public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
private static final ARecordType createDataverseRecordType() {
- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
}
// Helper constants for accessing fields in an ARecord of anonymous type
@@ -158,10 +159,11 @@
public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
private static final ARecordType createDatasetRecordType() {
String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
List<IAType> internalRecordUnionList = new ArrayList<IAType>();
internalRecordUnionList.add(BuiltinType.ANULL);
@@ -179,7 +181,8 @@
AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
+ BuiltinType.AINT32 };
return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
}
@@ -264,13 +267,14 @@
public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
private static final ARecordType createIndexRecordType() {
AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
- "IsPrimary", "Timestamp" };
+ "IsPrimary", "Timestamp", "PendingOp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
};
Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
===================================================================
--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
@@ -31,6 +31,7 @@
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -226,7 +227,7 @@
public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
}
public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
@@ -236,7 +237,7 @@
primaryIndexes[i].getNodeGroupName());
MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
}
}
@@ -267,7 +268,7 @@
for (int i = 0; i < secondaryIndexes.length; i++) {
MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
- secondaryIndexes[i].getPartitioningExpr(), false));
+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
}
}
Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
===================================================================
--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
@@ -95,11 +95,11 @@
File testFile = tcCtx.getTestFile(cUnit);
/*************** to avoid run failure cases ****************
- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
continue;
}
************************************************************/
-
+
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
+ tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
===================================================================
--- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
@@ -90,7 +90,7 @@
private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
ACIDException, AsterixException {
@@ -111,67 +111,10 @@
throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
}
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- return new JobSpecification[0];
+ return new JobSpecification();
}
-
- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
- dataset.getDatasetName());
- int numSecondaryIndexes = 0;
- for (Index index : datasetIndexes) {
- if (index.isSecondaryIndex()) {
- numSecondaryIndexes++;
- }
- }
- JobSpecification[] specs;
- if (numSecondaryIndexes > 0) {
- specs = new JobSpecification[numSecondaryIndexes + 1];
- int i = 0;
- // First, drop secondary indexes.
- for (Index index : datasetIndexes) {
- if (index.isSecondaryIndex()) {
- specs[i] = new JobSpecification();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
- datasetName, index.getIndexName());
- IIndexDataflowHelperFactory dfhFactory;
- switch (index.getIndexType()) {
- case BTREE:
- dfhFactory = new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
- break;
- case RTREE:
- dfhFactory = new LSMRTreeDataflowHelperFactory(
- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
- new IBinaryComparatorFactory[] { null },
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
- break;
- case NGRAM_INVIX:
- case WORD_INVIX:
- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
- break;
- default:
- throw new AsterixException("Unknown index type provided.");
- }
- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
- idxSplitsAndConstraint.second);
- i++;
- }
- }
- } else {
- specs = new JobSpecification[1];
- }
+
JobSpecification specPrimary = new JobSpecification();
- specs[specs.length - 1] = specPrimary;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
@@ -187,7 +130,7 @@
specPrimary.addRoot(primaryBtreeDrop);
- return specs;
+ return specPrimary;
}
public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
===================================================================
--- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
@@ -21,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.json.JSONException;
@@ -68,6 +70,7 @@
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -112,6 +115,7 @@
private final PrintWriter out;
private final SessionConfig sessionConfig;
private final DisplayFormat pdf;
+ private final ReadWriteLock cacheLatch;
private Dataverse activeDefaultDataverse;
private List<FunctionDecl> declaredFunctions;
@@ -121,6 +125,7 @@
this.out = out;
this.sessionConfig = pc;
this.pdf = pdf;
+ this.cacheLatch = new ReentrantReadWriteLock(true);
declaredFunctions = getDeclaredFunctions(aqlStatements);
}
@@ -143,8 +148,7 @@
for (Statement stmt : aqlStatements) {
validateOperation(activeDefaultDataverse, stmt);
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
metadataProvider.setWriterFactory(writerFactory);
metadataProvider.setOutputFile(outputFile);
metadataProvider.setConfig(config);
@@ -253,15 +257,9 @@
}
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
throw new AlgebricksException(e);
}
- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
- for (JobSpecification jobspec : jobsToExecute) {
- runJob(hcc, jobspec);
- }
}
return executionResult;
}
@@ -289,398 +287,802 @@
private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
- DataverseDecl dvd = (DataverseDecl) stmt;
- String dvName = dvd.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv == null) {
- throw new MetadataException("Unknown dataverse " + dvName);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ DataverseDecl dvd = (DataverseDecl) stmt;
+ String dvName = dvd.getDataverseName().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ if (dv == null) {
+ throw new MetadataException("Unknown dataverse " + dvName);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return dv;
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new MetadataException(e);
+ } finally {
+ releaseReadLatch();
}
- return dv;
}
private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
- String dvName = stmtCreateDataverse.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+ }
+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
- stmtCreateDataverse.getFormat()));
}
private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
- DatasetDecl dd = (DatasetDecl) stmt;
- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = dd.getName().getValue();
- DatasetType dsType = dd.getDatasetType();
- String itemTypeName = dd.getItemTypeName().getValue();
- IDatasetDetails datasetDetails = null;
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds != null) {
- if (dd.getIfNotExists()) {
- return;
- } else {
- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DatasetDecl dd = (DatasetDecl) stmt;
+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
}
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
- itemTypeName);
- if (dt == null) {
- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
- }
- switch (dd.getDatasetType()) {
- case INTERNAL: {
- IAType itemType = dt.getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only partition ARecord's.");
+ String datasetName = dd.getName().getValue();
+ DatasetType dsType = dd.getDatasetType();
+ String itemTypeName = dd.getItemTypeName().getValue();
+
+ IDatasetDetails datasetDetails = null;
+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ if (ds != null) {
+ if (dd.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
}
- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
- .getPartitioningExprs();
- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
- break;
}
- case EXTERNAL: {
- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
- datasetDetails = new ExternalDatasetDetails(adapter, properties);
- break;
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+ itemTypeName);
+ if (dt == null) {
+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
}
- case FEED: {
- IAType itemType = dt.getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only partition ARecord's.");
+ switch (dd.getDatasetType()) {
+ case INTERNAL: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+ ngName);
+ break;
}
- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
- break;
+ case EXTERNAL: {
+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
+ break;
+ }
+ case FEED: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getConfiguration();
+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+ break;
+ }
}
+
+ //#. add a new dataset with PendingAddOp
+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+
+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+ dataverseName);
+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+ metadataProvider);
+
+ //#. make metadataTxn commit before calling runJob.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ //#. runJob
+ runJob(hcc, jobSpec);
+
+ //#. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ }
+
+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
+ IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
- dataverseName);
- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
- }
}
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = stmtCreateIndex.getDatasetName().getValue();
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds == null) {
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- }
- String indexName = stmtCreateIndex.getIndexName().getValue();
- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, indexName);
- if (idx != null) {
- if (!stmtCreateIndex.getIfNotExists()) {
- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
- } else {
- stmtCreateIndex.setNeedToCreate(false);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
}
- } else {
+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ if (ds == null) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+ + dataverseName);
+ }
+
+ String indexName = stmtCreateIndex.getIndexName().getValue();
+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName, indexName);
+
+ if (idx != null) {
+ if (!stmtCreateIndex.getIfNotExists()) {
+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+ } else {
+ stmtCreateIndex.setNeedToCreate(false);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ }
+ }
+
+ //#. add a new index with PendingAddOp
Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+ IMetadataEntity.PENDING_ADD_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
+ //#. create the index artifact in NC.
CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
- JobSpecification loadIndexJobSpec = IndexOperations
- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
- runJob(hcc, loadIndexJobSpec);
+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
+ if (spec == null) {
+ throw new AsterixException("Failed to create job spec for creating index '"
+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, spec);
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. load data into the index in NC.
+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, spec);
+
+ //#. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+ indexName);
+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+ IMetadataEntity.PENDING_NO_OP);
+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
MetadataException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- TypeDecl stmtCreateType = (TypeDecl) stmt;
- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String typeName = stmtCreateType.getIdent().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
- if (dv == null) {
- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
- if (dt != null) {
- if (!stmtCreateType.getIfNotExists())
- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
- } else {
- if (builtinTypeMap.get(typeName) != null) {
- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ TypeDecl stmtCreateType = (TypeDecl) stmt;
+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String typeName = stmtCreateType.getIdent().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+ if (dv == null) {
+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+ }
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+ if (dt != null) {
+ if (!stmtCreateType.getIfNotExists()) {
+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+ }
} else {
- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
- dataverseName);
- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
- IAType type = typeMap.get(typeSignature);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+ if (builtinTypeMap.get(typeName) != null) {
+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+ } else {
+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+ dataverseName);
+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+ IAType type = typeMap.get(typeSignature);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+ }
}
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- String dvName = stmtDelete.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
- if (dv == null) {
- if (!stmtDelete.getIfExists()) {
- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+ String dvName = stmtDelete.getDataverseName().getValue();
+
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+ if (dv == null) {
+ if (!stmtDelete.getIfExists()) {
+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
}
- } else {
+
+ //#. prepare jobs which will drop corresponding datasets with indexes.
List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
for (int j = 0; j < datasets.size(); j++) {
String datasetName = datasets.get(j).getDatasetName();
DatasetType dsType = datasets.get(j).getDatasetType();
if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
+
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
for (int k = 0; k < indexes.size(); k++) {
if (indexes.get(k).isSecondaryIndex()) {
- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
- metadataProvider);
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+ indexes.get(k).getIndexName());
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
}
}
+
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
}
- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
}
+ //#. mark PendingDropOp on the dataverse record by
+ // first, deleting the dataverse record from the DATAVERSE_DATASET
+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+ IMetadataEntity.PENDING_DROP_OP));
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the dataverse.
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
activeDefaultDataverse = null;
}
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- DropStatement stmtDelete = (DropStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = stmtDelete.getDatasetName().getValue();
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null) {
- if (!stmtDelete.getIfExists())
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName + ".");
- } else {
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DropStatement stmtDelete = (DropStatement) stmt;
+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String datasetName = stmtDelete.getDatasetName().getValue();
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (ds == null) {
+ if (!stmtDelete.getIfExists()) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName
+ + " in dataverse " + dataverseName + ".");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ }
+
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+
+ //#. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int j = 0; j < indexes.size(); j++) {
- if (indexes.get(j).isPrimaryIndex()) {
- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
- metadataProvider);
+ if (indexes.get(j).isSecondaryIndex()) {
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+ indexes.get(j).getIndexName());
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
}
}
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+
+ //#. mark the existing dataset as PendingDropOp
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+ MetadataManager.INSTANCE.addDataset(
+ mdTxnCtx,
+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ //#. run the jobs
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
+
+ //#. finally, delete the dataset.
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
- String datasetName = stmtIndexDrop.getDatasetName().getValue();
- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (ds == null) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+ + dataverseName);
+ }
+
+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+ String indexName = stmtIndexDrop.getIndexName().getValue();
+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ if (index == null) {
+ if (!stmtIndexDrop.getIfExists()) {
+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
+ }
+ } else {
+ //#. prepare a job to drop the index in NC.
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+ indexName);
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+
+ //#. mark PendingDropOp on the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ MetadataManager.INSTANCE.addIndex(
+ mdTxnCtx,
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+ //#. commit the existing transaction before calling runJob.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ //#. begin a new transaction
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ }
+ } else {
+ throw new AlgebricksException(datasetName
+ + " is an external dataset. Indexes are not maintained for external datasets.");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+
+ } finally {
+ releaseWriteLatch();
}
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null)
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
- String indexName = stmtIndexDrop.getIndexName().getValue();
- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- if (idx == null) {
- if (!stmtIndexDrop.getIfExists())
- throw new AlgebricksException("There is no index with this name " + indexName + ".");
- } else
- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
- } else {
- throw new AlgebricksException(datasetName
- + " is an external dataset. Indexes are not maintained for external datasets.");
- }
}
private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String typeName = stmtTypeDrop.getTypeName().getValue();
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+ if (dt == null) {
+ if (!stmtTypeDrop.getIfExists())
+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+ } else {
+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- String typeName = stmtTypeDrop.getTypeName().getValue();
- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
- if (dt == null) {
- if (!stmtTypeDrop.getIfExists())
- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
- } else {
- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
- }
}
private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
- if (ng == null) {
- if (!stmtDelete.getIfExists())
- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
- } else {
- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+ if (ng == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+ } else {
+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
- if (dataverse == null) {
- throw new AlgebricksException(" dataverse not specified ");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+ if (dataverse == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+ if (dv == null) {
+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+ }
+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
- if (dv == null) {
- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
- }
- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
}
private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
AlgebricksException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
- if (function == null) {
- if (!stmtDropFunction.getIfExists())
- throw new AlgebricksException("Unknonw function " + signature);
- } else {
- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+ if (function == null) {
+ if (!stmtDropFunction.getIfExists())
+ throw new AlgebricksException("Unknonw function " + signature);
+ } else {
+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
- jobsToExecute.add(job.getJobSpec());
- // Also load the dataset's secondary indexes.
- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
- .getDatasetName().getValue());
- for (Index index : datasetIndexes) {
- if (!index.isSecondaryIndex()) {
- continue;
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+ loadStmt.dataIsAlreadySorted());
+
+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+ jobsToExecute.add(job.getJobSpec());
+ // Also load the dataset's secondary indexes.
+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+ .getDatasetName().getValue());
+ for (Index index : datasetIndexes) {
+ if (!index.isSecondaryIndex()) {
+ continue;
+ }
+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
+ index.getIndexType());
+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
}
- // Create CompiledCreateIndexStatement from metadata entity 'index'.
- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobspec : jobsToExecute) {
+ runJob(hcc, jobspec);
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- InsertStatement stmtInsert = (InsertStatement) stmt;
- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ InsertStatement stmtInsert = (InsertStatement) stmt;
+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- DeleteStatement stmtDelete = (DeleteStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
- stmtDelete.getVarCounter(), metadataProvider);
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+ stmtDelete.getVarCounter(), metadataProvider);
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
@@ -704,46 +1106,109 @@
private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
- Dataset dataset;
- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
- .getDatasetName().getValue());
- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
+ try {
+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+
+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
+
+ Dataset dataset;
+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+ .getDatasetName().getValue());
+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
+ + " is not a feed dataset");
+ }
+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+ cbfs.setQuery(bfs.getQuery());
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
- cbfs.setQuery(bfs.getQuery());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
- }
}
private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, jobSpec);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
+ }
}
private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
List<JobSpecification> jobsToExecute) throws Exception {
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
- if (compiled.first != null) {
- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
- jobsToExecute.add(compiled.first);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+
+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+ runJob(hcc, compiled.first);
+ }
+
+ return queryResult;
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
}
private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
@@ -768,20 +1233,32 @@
private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
- if (ng != null) {
- if (!stmtCreateNodegroup.getIfNotExists())
- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
- } else {
- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
- for (Identifier id : ncIdentifiers) {
- ncNames.add(id.getValue());
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+ if (ng != null) {
+ if (!stmtCreateNodegroup.getIfNotExists())
+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+ } else {
+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+ for (Identifier id : ncIdentifiers) {
+ ncNames.add(id.getValue());
+ }
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
}
- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
@@ -791,10 +1268,37 @@
private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
String indexName, AqlMetadataProvider metadataProvider) throws Exception {
+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+
+ //#. mark PendingDropOp on the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ MetadataManager.INSTANCE.addIndex(
+ mdTxnCtx,
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
- indexName);
+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
+
+ //#. commit the existing transaction before calling runJob.
+ // the caller should begin the transaction before calling this function.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ try {
+ runJob(hcc, jobSpec);
+ } catch (Exception e) {
+ //need to create the mdTxnCtx to be aborted by caller properly
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ throw e;
+ }
+
+ //#. begin a new transaction
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
}
private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
@@ -803,10 +1307,32 @@
CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
- for (JobSpecification spec : jobSpecs)
- runJob(hcc, spec);
+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+
+ //#. mark PendingDropOp on the existing dataset
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+ //#. commit the transaction
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ //#. run the job
+ try {
+ runJob(hcc, jobSpec);
+ } catch (Exception e) {
+ //need to create the mdTxnCtx to be aborted by caller properly
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ throw e;
+ }
+
+ //#. start a new transaction
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
+
+ //#. finally, delete the existing dataset.
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
}
@@ -831,4 +1357,20 @@
}
return format;
}
+
+ private void acquireWriteLatch() {
+ cacheLatch.writeLock().lock();
+ }
+
+ private void releaseWriteLatch() {
+ cacheLatch.writeLock().unlock();
+ }
+
+ private void acquireReadLatch() {
+ cacheLatch.readLock().lock();
+ }
+
+ private void releaseReadLatch() {
+ cacheLatch.readLock().unlock();
+ }
}