| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy) |
| @@ -25,8 +25,11 @@ |
| import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes; |
| import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes; |
| import edu.uci.ics.asterix.metadata.entities.Dataverse; |
| +import edu.uci.ics.asterix.om.base.AInt32; |
| +import edu.uci.ics.asterix.om.base.AMutableInt32; |
| import edu.uci.ics.asterix.om.base.ARecord; |
| import edu.uci.ics.asterix.om.base.AString; |
| +import edu.uci.ics.asterix.om.types.BuiltinType; |
| import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; |
| import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; |
| |
| @@ -40,12 +43,18 @@ |
| // Payload field containing serialized Dataverse. |
| public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1; |
| |
| + private AMutableInt32 aInt32; |
| + protected ISerializerDeserializer<AInt32> aInt32Serde; |
| + |
| @SuppressWarnings("unchecked") |
| private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE); |
| |
| + @SuppressWarnings("unchecked") |
| public DataverseTupleTranslator(boolean getTuple) { |
| super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount()); |
| + aInt32 = new AMutableInt32(-1); |
| + aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); |
| } |
| |
| @Override |
| @@ -57,7 +66,8 @@ |
| DataInput in = new DataInputStream(stream); |
| ARecord dataverseRecord = recordSerDes.deserialize(in); |
| return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(), |
| - ((AString) dataverseRecord.getValueByPos(1)).getStringValue()); |
| + ((AString) dataverseRecord.getValueByPos(1)).getStringValue(), |
| + ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue()); |
| } |
| |
| @Override |
| @@ -88,6 +98,12 @@ |
| stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| |
| + // write field 3 |
| + fieldValue.reset(); |
| + aInt32.setValue(instance.getPendingOp()); |
| + aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| + recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| + |
| recordBuilder.write(tupleBuilder.getDataOutput(), true); |
| tupleBuilder.addFieldEndOffset(); |
| |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy) |
| @@ -77,9 +77,9 @@ |
| protected ISerializerDeserializer<AInt32> aInt32Serde; |
| |
| @SuppressWarnings("unchecked") |
| - public DatasetTupleTranslator(boolean getTuple) { |
| + public DatasetTupleTranslator(boolean getTuple) { |
| super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount()); |
| - aInt32 = new AMutableInt32(-1); |
| + aInt32 = new AMutableInt32(-1); |
| aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); |
| } |
| |
| @@ -104,8 +104,10 @@ |
| .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue(); |
| DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue()); |
| IDatasetDetails datasetDetails = null; |
| - int datasetId = ((AInt32) datasetRecord |
| + int datasetId = ((AInt32) datasetRecord |
| .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue(); |
| + int pendingOp = ((AInt32) datasetRecord |
| + .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue(); |
| switch (datasetType) { |
| case FEED: |
| case INTERNAL: { |
| @@ -197,7 +199,7 @@ |
| } |
| datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| } |
| - return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId); |
| + return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp); |
| } |
| |
| @Override |
| @@ -248,13 +250,19 @@ |
| aString.setValue(Calendar.getInstance().getTime().toString()); |
| stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| - |
| + |
| // write field 8 |
| fieldValue.reset(); |
| aInt32.setValue(dataset.getDatasetId()); |
| aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue); |
| - |
| + |
| + // write field 9 |
| + fieldValue.reset(); |
| + aInt32.setValue(dataset.getPendingOp()); |
| + aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| + recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| + |
| // write record |
| recordBuilder.write(tupleBuilder.getDataOutput(), true); |
| tupleBuilder.addFieldEndOffset(); |
| @@ -290,13 +298,15 @@ |
| fieldValue.reset(); |
| aString.setValue(name); |
| stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| - propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue); |
| + propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, |
| + fieldValue); |
| |
| // write field 1 |
| fieldValue.reset(); |
| aString.setValue(value); |
| stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| - propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue); |
| + propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, |
| + fieldValue); |
| |
| propertyRecordBuilder.write(out, true); |
| } |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy) |
| @@ -96,13 +96,15 @@ |
| } |
| Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX)) |
| .getBoolean(); |
| + int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX)) |
| + .getIntegerValue(); |
| // Check if there is a gram length as well. |
| int gramLength = -1; |
| int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME); |
| if (gramLenPos >= 0) { |
| gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue(); |
| } |
| - return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex); |
| + return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp); |
| } |
| |
| @Override |
| @@ -174,7 +176,12 @@ |
| stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| |
| - // write optional field 7 |
| + // write field 7 |
| + fieldValue.reset(); |
| + intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput()); |
| + recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| + |
| + // write optional field 8 |
| if (instance.getGramLength() > 0) { |
| fieldValue.reset(); |
| nameValue.reset(); |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy) |
| @@ -129,7 +129,7 @@ |
| |
| public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> { |
| private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName()); |
| - private final MetadataTransactionContext mdTxnCtx; |
| + private MetadataTransactionContext mdTxnCtx; |
| private boolean isWriteTransaction; |
| private Map<String, String[]> stores; |
| private Map<String, String> config; |
| @@ -156,8 +156,7 @@ |
| return config; |
| } |
| |
| - public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) { |
| - this.mdTxnCtx = mdTxnCtx; |
| + public AqlMetadataProvider(Dataverse defaultDataverse) { |
| this.defaultDataverse = defaultDataverse; |
| this.stores = AsterixProperties.INSTANCE.getStores(); |
| } |
| @@ -181,6 +180,10 @@ |
| public void setWriterFactory(IAWriterFactory writerFactory) { |
| this.writerFactory = writerFactory; |
| } |
| + |
| + public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) { |
| + this.mdTxnCtx = mdTxnCtx; |
| + } |
| |
| public MetadataTransactionContext getMetadataTxnContext() { |
| return mdTxnCtx; |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy) |
| @@ -35,15 +35,18 @@ |
| private final DatasetType datasetType; |
| private IDatasetDetails datasetDetails; |
| private final int datasetId; |
| + // Type of pending operations with respect to atomic DDL operation |
| + private final int pendingOp; |
| |
| public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails, |
| - DatasetType datasetType, int datasetId) { |
| + DatasetType datasetType, int datasetId, int pendingOp) { |
| this.dataverseName = dataverseName; |
| this.datasetName = datasetName; |
| this.itemTypeName = itemTypeName; |
| this.datasetType = datasetType; |
| this.datasetDetails = datasetDetails; |
| this.datasetId = datasetId; |
| + this.pendingOp = pendingOp; |
| } |
| |
| public String getDataverseName() { |
| @@ -73,6 +76,10 @@ |
| public int getDatasetId() { |
| return datasetId; |
| } |
| + |
| + public int getPendingOp() { |
| + return pendingOp; |
| + } |
| |
| @Override |
| public Object addToCache(MetadataCache cache) { |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy) |
| @@ -45,9 +45,11 @@ |
| private final boolean isPrimaryIndex; |
| // Specific to NGRAM indexes. |
| private final int gramLength; |
| + // Type of pending operations with respect to atomic DDL operation |
| + private final int pendingOp; |
| |
| public Index(String dataverseName, String datasetName, String indexName, IndexType indexType, |
| - List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) { |
| + List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) { |
| this.dataverseName = dataverseName; |
| this.datasetName = datasetName; |
| this.indexName = indexName; |
| @@ -55,10 +57,11 @@ |
| this.keyFieldNames = keyFieldNames; |
| this.gramLength = gramLength; |
| this.isPrimaryIndex = isPrimaryIndex; |
| + this.pendingOp = pendingOp; |
| } |
| |
| public Index(String dataverseName, String datasetName, String indexName, IndexType indexType, |
| - List<String> keyFieldNames, boolean isPrimaryIndex) { |
| + List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) { |
| this.dataverseName = dataverseName; |
| this.datasetName = datasetName; |
| this.indexName = indexName; |
| @@ -66,6 +69,7 @@ |
| this.keyFieldNames = keyFieldNames; |
| this.gramLength = -1; |
| this.isPrimaryIndex = isPrimaryIndex; |
| + this.pendingOp = pendingOp; |
| } |
| |
| public String getDataverseName() { |
| @@ -95,6 +99,10 @@ |
| public boolean isPrimaryIndex() { |
| return isPrimaryIndex; |
| } |
| + |
| + public int getPendingOp() { |
| + return pendingOp; |
| + } |
| |
| public boolean isSecondaryIndex() { |
| return !isPrimaryIndex(); |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy) |
| @@ -27,10 +27,12 @@ |
| // Enforced to be unique within an Asterix cluster.. |
| private final String dataverseName; |
| private final String dataFormat; |
| + private final int pendingOp; |
| |
| - public Dataverse(String dataverseName, String format) { |
| + public Dataverse(String dataverseName, String format, int pendingOp) { |
| this.dataverseName = dataverseName; |
| this.dataFormat = format; |
| + this.pendingOp = pendingOp; |
| } |
| |
| public String getDataverseName() { |
| @@ -40,6 +42,10 @@ |
| public String getDataFormat() { |
| return dataFormat; |
| } |
| + |
| + public int getPendingOp() { |
| + return pendingOp; |
| + } |
| |
| @Override |
| public Object addToCache(MetadataCache cache) { |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy) |
| @@ -25,6 +25,7 @@ |
| import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider; |
| +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| import edu.uci.ics.asterix.metadata.api.IMetadataIndex; |
| import edu.uci.ics.asterix.metadata.api.IMetadataNode; |
| import edu.uci.ics.asterix.metadata.api.IValueExtractor; |
| @@ -160,7 +161,7 @@ |
| // Add the primary index for the dataset. |
| InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails(); |
| Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(), |
| - dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true); |
| + dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp()); |
| addIndex(jobId, primaryIndex); |
| ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(), |
| dataset.getDatasetName()); |
| @@ -260,7 +261,7 @@ |
| IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, |
| NoOpOperationCallback.INSTANCE); |
| TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId); |
| - transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| + //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| // TODO: fix exceptions once new BTree exception model is in hyracks. |
| indexAccessor.insert(tuple); |
| //TODO: extract the key from the tuple and get the PKHashValue from the key. |
| @@ -536,7 +537,7 @@ |
| // The transaction with txnId will have an S lock on the |
| // resource. Note that lock converters have a higher priority than |
| // regular waiters in the LockManager. |
| - transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| + //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| indexAccessor.delete(tuple); |
| //TODO: extract the key from the tuple and get the PKHashValue from the key. |
| //check how to get the oldValue. |
| @@ -803,7 +804,9 @@ |
| private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey, |
| IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception { |
| TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId); |
| - transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx); |
| + //#. currently lock is not needed to access any metadata |
| + // since the non-compatible concurrent access is always protected by the latch in the MetadataManager. |
| + //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx); |
| IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory(); |
| long resourceID = index.getResourceID(); |
| IIndex indexInstance = indexLifecycleManager.getIndex(resourceID); |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy) |
| @@ -20,6 +20,11 @@ |
| import edu.uci.ics.asterix.metadata.MetadataCache; |
| |
| public interface IMetadataEntity extends Serializable { |
| + |
| + public static final int PENDING_NO_OP = 0; |
| + public static final int PENDING_ADD_OP = 1; |
| + public static final int PENDING_DROP_OP = 2; |
| + |
| Object addToCache(MetadataCache cache); |
| |
| Object dropFromCache(MetadataCache cache); |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy) |
| @@ -17,6 +17,8 @@ |
| |
| import java.rmi.RemoteException; |
| import java.util.List; |
| +import java.util.concurrent.locks.ReadWriteLock; |
| +import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy; |
| @@ -79,11 +81,10 @@ |
| public class MetadataManager implements IMetadataManager { |
| // Set in init(). |
| public static MetadataManager INSTANCE; |
| - |
| private final MetadataCache cache = new MetadataCache(); |
| private IAsterixStateProxy proxy; |
| private IMetadataNode metadataNode; |
| - |
| + |
| public MetadataManager(IAsterixStateProxy proxy) { |
| if (proxy == null) { |
| throw new Error("Null proxy given to MetadataManager."); |
| @@ -206,11 +207,14 @@ |
| |
| @Override |
| public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException { |
| + // add dataset into metadataNode |
| try { |
| metadataNode.addDataset(ctx.getJobId(), dataset); |
| } catch (RemoteException e) { |
| throw new MetadataException(e); |
| } |
| + |
| + // reflect the dataset into the cache |
| ctx.addDataset(dataset); |
| } |
| |
| @@ -585,4 +589,5 @@ |
| } |
| return adapter; |
| } |
| + |
| } |
| \ No newline at end of file |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy) |
| @@ -19,6 +19,7 @@ |
| |
| import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier; |
| +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| import edu.uci.ics.asterix.metadata.entities.Dataset; |
| import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter; |
| import edu.uci.ics.asterix.metadata.entities.Datatype; |
| @@ -104,19 +105,19 @@ |
| } |
| |
| public void dropDataset(String dataverseName, String datasetName) { |
| - Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1); |
| + Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP); |
| droppedCache.addDatasetIfNotExists(dataset); |
| logAndApply(new MetadataLogicalOperation(dataset, false)); |
| } |
| |
| public void dropIndex(String dataverseName, String datasetName, String indexName) { |
| - Index index = new Index(dataverseName, datasetName, indexName, null, null, false); |
| + Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP); |
| droppedCache.addIndexIfNotExists(index); |
| logAndApply(new MetadataLogicalOperation(index, false)); |
| } |
| |
| public void dropDataverse(String dataverseName) { |
| - Dataverse dataverse = new Dataverse(dataverseName, null); |
| + Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP); |
| droppedCache.addDataverseIfNotExists(dataverse); |
| logAndApply(new MetadataLogicalOperation(dataverse, false)); |
| } |
| @@ -162,7 +163,7 @@ |
| } |
| return droppedCache.getDataset(dataverseName, datasetName) != null; |
| } |
| - |
| + |
| public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) { |
| if (droppedCache.getDataverse(dataverseName) != null) { |
| return true; |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy) |
| @@ -80,10 +80,11 @@ |
| public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0; |
| public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1; |
| public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2; |
| + public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3; |
| |
| private static final ARecordType createDataverseRecordType() { |
| - return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" }, |
| - new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true); |
| + return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" }, |
| + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true); |
| } |
| |
| // Helper constants for accessing fields in an ARecord of anonymous type |
| @@ -158,10 +159,11 @@ |
| public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6; |
| public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7; |
| public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8; |
| + public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9; |
| |
| private static final ARecordType createDatasetRecordType() { |
| String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails", |
| - "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" }; |
| + "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" }; |
| |
| List<IAType> internalRecordUnionList = new ArrayList<IAType>(); |
| internalRecordUnionList.add(BuiltinType.ANULL); |
| @@ -179,7 +181,8 @@ |
| AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null); |
| |
| IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, |
| - internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 }; |
| + internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32, |
| + BuiltinType.AINT32 }; |
| return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true); |
| } |
| |
| @@ -264,13 +267,14 @@ |
| public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4; |
| public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5; |
| public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6; |
| + public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7; |
| |
| private static final ARecordType createIndexRecordType() { |
| AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null); |
| String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey", |
| - "IsPrimary", "Timestamp" }; |
| + "IsPrimary", "Timestamp", "PendingOp" }; |
| IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, |
| - olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING }; |
| + olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 }; |
| return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true); |
| }; |
| |
| Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java |
| =================================================================== |
| --- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061) |
| +++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy) |
| @@ -31,6 +31,7 @@ |
| import edu.uci.ics.asterix.metadata.IDatasetDetails; |
| import edu.uci.ics.asterix.metadata.MetadataManager; |
| import edu.uci.ics.asterix.metadata.MetadataTransactionContext; |
| +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| import edu.uci.ics.asterix.metadata.api.IMetadataIndex; |
| import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap; |
| import edu.uci.ics.asterix.metadata.entities.Dataset; |
| @@ -226,7 +227,7 @@ |
| public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception { |
| String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName(); |
| String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT; |
| - MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat)); |
| + MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP)); |
| } |
| |
| public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception { |
| @@ -236,7 +237,7 @@ |
| primaryIndexes[i].getNodeGroupName()); |
| MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(), |
| primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(), |
| - id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId())); |
| + id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP)); |
| } |
| } |
| |
| @@ -267,7 +268,7 @@ |
| for (int i = 0; i < secondaryIndexes.length; i++) { |
| MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(), |
| secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE, |
| - secondaryIndexes[i].getPartitioningExpr(), false)); |
| + secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP)); |
| } |
| } |
| |
| Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java |
| =================================================================== |
| --- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061) |
| +++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy) |
| @@ -95,11 +95,11 @@ |
| File testFile = tcCtx.getTestFile(cUnit); |
| |
| /*************** to avoid run failure cases **************** |
| - if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) { |
| + if (!testFile.getAbsolutePath().contains("index-selection/")) { |
| continue; |
| } |
| ************************************************************/ |
| - |
| + |
| File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |
| File actualFile = new File(PATH_ACTUAL + File.separator |
| + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm"); |
| Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java |
| =================================================================== |
| --- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061) |
| +++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy) |
| @@ -90,7 +90,7 @@ |
| |
| private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName()); |
| |
| - public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt, |
| + public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt, |
| AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException, |
| ACIDException, AsterixException { |
| |
| @@ -111,67 +111,10 @@ |
| throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName); |
| } |
| if (dataset.getDatasetType() == DatasetType.EXTERNAL) { |
| - return new JobSpecification[0]; |
| + return new JobSpecification(); |
| } |
| - |
| - List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), |
| - dataset.getDatasetName()); |
| - int numSecondaryIndexes = 0; |
| - for (Index index : datasetIndexes) { |
| - if (index.isSecondaryIndex()) { |
| - numSecondaryIndexes++; |
| - } |
| - } |
| - JobSpecification[] specs; |
| - if (numSecondaryIndexes > 0) { |
| - specs = new JobSpecification[numSecondaryIndexes + 1]; |
| - int i = 0; |
| - // First, drop secondary indexes. |
| - for (Index index : datasetIndexes) { |
| - if (index.isSecondaryIndex()) { |
| - specs[i] = new JobSpecification(); |
| - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider |
| - .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), |
| - datasetName, index.getIndexName()); |
| - IIndexDataflowHelperFactory dfhFactory; |
| - switch (index.getIndexType()) { |
| - case BTREE: |
| - dfhFactory = new LSMBTreeDataflowHelperFactory( |
| - AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER); |
| - break; |
| - case RTREE: |
| - dfhFactory = new LSMRTreeDataflowHelperFactory( |
| - new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE, |
| - new IBinaryComparatorFactory[] { null }, |
| - AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null); |
| - break; |
| - case NGRAM_INVIX: |
| - case WORD_INVIX: |
| - dfhFactory = new LSMInvertedIndexDataflowHelperFactory( |
| - AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, |
| - AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER); |
| - break; |
| - default: |
| - throw new AsterixException("Unknown index type provided."); |
| - } |
| - IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i], |
| - AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, |
| - AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory); |
| - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop, |
| - idxSplitsAndConstraint.second); |
| - i++; |
| - } |
| - } |
| - } else { |
| - specs = new JobSpecification[1]; |
| - } |
| + |
| JobSpecification specPrimary = new JobSpecification(); |
| - specs[specs.length - 1] = specPrimary; |
| |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider |
| .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName, |
| @@ -187,7 +130,7 @@ |
| |
| specPrimary.addRoot(primaryBtreeDrop); |
| |
| - return specs; |
| + return specPrimary; |
| } |
| |
| public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName, |
| Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java |
| =================================================================== |
| --- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061) |
| +++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy) |
| @@ -21,6 +21,8 @@ |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| +import java.util.concurrent.locks.ReadWriteLock; |
| +import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.json.JSONException; |
| |
| @@ -68,6 +70,7 @@ |
| import edu.uci.ics.asterix.metadata.MetadataException; |
| import edu.uci.ics.asterix.metadata.MetadataManager; |
| import edu.uci.ics.asterix.metadata.MetadataTransactionContext; |
| +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; |
| import edu.uci.ics.asterix.metadata.entities.Dataset; |
| import edu.uci.ics.asterix.metadata.entities.Datatype; |
| @@ -112,6 +115,7 @@ |
| private final PrintWriter out; |
| private final SessionConfig sessionConfig; |
| private final DisplayFormat pdf; |
| + private final ReadWriteLock cacheLatch; |
| private Dataverse activeDefaultDataverse; |
| private List<FunctionDecl> declaredFunctions; |
| |
| @@ -121,6 +125,7 @@ |
| this.out = out; |
| this.sessionConfig = pc; |
| this.pdf = pdf; |
| + this.cacheLatch = new ReentrantReadWriteLock(true); |
| declaredFunctions = getDeclaredFunctions(aqlStatements); |
| } |
| |
| @@ -143,8 +148,7 @@ |
| |
| for (Statement stmt : aqlStatements) { |
| validateOperation(activeDefaultDataverse, stmt); |
| - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| - AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse); |
| + AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse); |
| metadataProvider.setWriterFactory(writerFactory); |
| metadataProvider.setOutputFile(outputFile); |
| metadataProvider.setConfig(config); |
| @@ -253,15 +257,9 @@ |
| } |
| |
| } |
| - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| - MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| throw new AlgebricksException(e); |
| } |
| - // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener |
| - for (JobSpecification jobspec : jobsToExecute) { |
| - runJob(hcc, jobspec); |
| - } |
| } |
| return executionResult; |
| } |
| @@ -289,398 +287,802 @@ |
| |
| private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException { |
| - DataverseDecl dvd = (DataverseDecl) stmt; |
| - String dvName = dvd.getDataverseName().getValue(); |
| - Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| - if (dv == null) { |
| - throw new MetadataException("Unknown dataverse " + dvName); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + DataverseDecl dvd = (DataverseDecl) stmt; |
| + String dvName = dvd.getDataverseName().getValue(); |
| + Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| + if (dv == null) { |
| + throw new MetadataException("Unknown dataverse " + dvName); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + return dv; |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new MetadataException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| - return dv; |
| } |
| |
| private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| ACIDException { |
| - CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; |
| - String dvName = stmtCreateDataverse.getDataverseName().getValue(); |
| - Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| - if (dv != null && !stmtCreateDataverse.getIfNotExists()) { |
| - throw new AlgebricksException("A dataverse with this name " + dvName + " already exists."); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; |
| + String dvName = stmtCreateDataverse.getDataverseName().getValue(); |
| + Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| + if (dv != null && !stmtCreateDataverse.getIfNotExists()) { |
| + throw new AlgebricksException("A dataverse with this name " + dvName + " already exists."); |
| + } |
| + MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName, |
| + stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP)); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| - MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName, |
| - stmtCreateDataverse.getFormat())); |
| } |
| |
| private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception { |
| - DatasetDecl dd = (DatasetDecl) stmt; |
| - String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue() |
| - : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null; |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| - } |
| - String datasetName = dd.getName().getValue(); |
| - DatasetType dsType = dd.getDatasetType(); |
| - String itemTypeName = dd.getItemTypeName().getValue(); |
| |
| - IDatasetDetails datasetDetails = null; |
| - Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| - datasetName); |
| - if (ds != null) { |
| - if (dd.getIfNotExists()) { |
| - return; |
| - } else { |
| - throw new AlgebricksException("A dataset with this name " + datasetName + " already exists."); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + DatasetDecl dd = (DatasetDecl) stmt; |
| + String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue() |
| + : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null; |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| } |
| - } |
| - Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName, |
| - itemTypeName); |
| - if (dt == null) { |
| - throw new AlgebricksException(": type " + itemTypeName + " could not be found."); |
| - } |
| - switch (dd.getDatasetType()) { |
| - case INTERNAL: { |
| - IAType itemType = dt.getDatatype(); |
| - if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| - throw new AlgebricksException("Can only partition ARecord's."); |
| + String datasetName = dd.getName().getValue(); |
| + DatasetType dsType = dd.getDatasetType(); |
| + String itemTypeName = dd.getItemTypeName().getValue(); |
| + |
| + IDatasetDetails datasetDetails = null; |
| + Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| + datasetName); |
| + if (ds != null) { |
| + if (dd.getIfNotExists()) { |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + return; |
| + } else { |
| + throw new AlgebricksException("A dataset with this name " + datasetName + " already exists."); |
| } |
| - List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) |
| - .getPartitioningExprs(); |
| - String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| - datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| - InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName); |
| - break; |
| } |
| - case EXTERNAL: { |
| - String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); |
| - Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); |
| - datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| - break; |
| + Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName, |
| + itemTypeName); |
| + if (dt == null) { |
| + throw new AlgebricksException(": type " + itemTypeName + " could not be found."); |
| } |
| - case FEED: { |
| - IAType itemType = dt.getDatatype(); |
| - if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| - throw new AlgebricksException("Can only partition ARecord's."); |
| + switch (dd.getDatasetType()) { |
| + case INTERNAL: { |
| + IAType itemType = dt.getDatatype(); |
| + if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| + throw new AlgebricksException("Can only partition ARecord's."); |
| + } |
| + List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) |
| + .getPartitioningExprs(); |
| + String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| + datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| + InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, |
| + ngName); |
| + break; |
| } |
| - List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); |
| - String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| - String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname(); |
| - Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration(); |
| - FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature(); |
| - datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| - InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName, |
| - adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString()); |
| - break; |
| + case EXTERNAL: { |
| + String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); |
| + Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); |
| + datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| + break; |
| + } |
| + case FEED: { |
| + IAType itemType = dt.getDatatype(); |
| + if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| + throw new AlgebricksException("Can only partition ARecord's."); |
| + } |
| + List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()) |
| + .getPartitioningExprs(); |
| + String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| + String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname(); |
| + Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()) |
| + .getConfiguration(); |
| + FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature(); |
| + datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| + InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, |
| + ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString()); |
| + break; |
| + } |
| } |
| + |
| + //#. add a new dataset with PendingAddOp |
| + Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType, |
| + DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP); |
| + MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); |
| + |
| + if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) { |
| + Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), |
| + dataverseName); |
| + JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName, |
| + metadataProvider); |
| + |
| + //#. make metadataTxn commit before calling runJob. |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + //#. runJob |
| + runJob(hcc, jobSpec); |
| + |
| + //#. begin new metadataTxn |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + } |
| + |
| + //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp |
| + MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); |
| + MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName, |
| + datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(), |
| + IMetadataEntity.PENDING_NO_OP)); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| - MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName, |
| - datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId())); |
| - if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) { |
| - Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), |
| - dataverseName); |
| - runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider)); |
| - } |
| } |
| |
| private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; |
| - String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue(); |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| - } |
| - String datasetName = stmtCreateIndex.getDatasetName().getValue(); |
| - Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| - datasetName); |
| - if (ds == null) { |
| - throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| - + dataverseName); |
| - } |
| - String indexName = stmtCreateIndex.getIndexName().getValue(); |
| - Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| - datasetName, indexName); |
| - if (idx != null) { |
| - if (!stmtCreateIndex.getIfNotExists()) { |
| - throw new AlgebricksException("An index with this name " + indexName + " already exists."); |
| - } else { |
| - stmtCreateIndex.setNeedToCreate(false); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; |
| + String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue(); |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| } |
| - } else { |
| + String datasetName = stmtCreateIndex.getDatasetName().getValue(); |
| + |
| + Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| + datasetName); |
| + if (ds == null) { |
| + throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| + + dataverseName); |
| + } |
| + |
| + String indexName = stmtCreateIndex.getIndexName().getValue(); |
| + Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| + datasetName, indexName); |
| + |
| + if (idx != null) { |
| + if (!stmtCreateIndex.getIfNotExists()) { |
| + throw new AlgebricksException("An index with this name " + indexName + " already exists."); |
| + } else { |
| + stmtCreateIndex.setNeedToCreate(false); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + return; |
| + } |
| + } |
| + |
| + //#. add a new index with PendingAddOp |
| Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), |
| - stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false); |
| + stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false, |
| + IMetadataEntity.PENDING_ADD_OP); |
| MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| - runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider); |
| |
| + //#. create the index artifact in NC. |
| CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, |
| index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| - JobSpecification loadIndexJobSpec = IndexOperations |
| - .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider); |
| - runJob(hcc, loadIndexJobSpec); |
| + JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider); |
| + if (spec == null) { |
| + throw new AsterixException("Failed to create job spec for creating index '" |
| + + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'"); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + runJob(hcc, spec); |
| + |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + |
| + //#. load data into the index in NC. |
| + cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(), |
| + index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| + spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + runJob(hcc, spec); |
| + |
| + //#. begin new metadataTxn |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + |
| + //#. add another new index with PendingNoOp after deleting the index with PendingAddOp |
| + MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, |
| + indexName); |
| + index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), |
| + stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false, |
| + IMetadataEntity.PENDING_NO_OP); |
| + MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException, |
| MetadataException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - TypeDecl stmtCreateType = (TypeDecl) stmt; |
| - String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue(); |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| - } |
| - String typeName = stmtCreateType.getIdent().getValue(); |
| - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| - if (dv == null) { |
| - throw new AlgebricksException("Unknonw dataverse " + dataverseName); |
| - } |
| - Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| - if (dt != null) { |
| - if (!stmtCreateType.getIfNotExists()) |
| - throw new AlgebricksException("A datatype with this name " + typeName + " already exists."); |
| - } else { |
| - if (builtinTypeMap.get(typeName) != null) { |
| - throw new AlgebricksException("Cannot redefine builtin type " + typeName + "."); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + TypeDecl stmtCreateType = (TypeDecl) stmt; |
| + String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue(); |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| + } |
| + String typeName = stmtCreateType.getIdent().getValue(); |
| + Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| + if (dv == null) { |
| + throw new AlgebricksException("Unknonw dataverse " + dataverseName); |
| + } |
| + Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| + if (dt != null) { |
| + if (!stmtCreateType.getIfNotExists()) { |
| + throw new AlgebricksException("A datatype with this name " + typeName + " already exists."); |
| + } |
| } else { |
| - Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt, |
| - dataverseName); |
| - TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); |
| - IAType type = typeMap.get(typeSignature); |
| - MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); |
| + if (builtinTypeMap.get(typeName) != null) { |
| + throw new AlgebricksException("Cannot redefine builtin type " + typeName + "."); |
| + } else { |
| + Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt, |
| + dataverseName); |
| + TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); |
| + IAType type = typeMap.get(typeSignature); |
| + MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); |
| + } |
| } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt; |
| - String dvName = stmtDelete.getDataverseName().getValue(); |
| |
| - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName); |
| - if (dv == null) { |
| - if (!stmtDelete.getIfExists()) { |
| - throw new AlgebricksException("There is no dataverse with this name " + dvName + "."); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt; |
| + String dvName = stmtDelete.getDataverseName().getValue(); |
| + |
| + Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName); |
| + if (dv == null) { |
| + if (!stmtDelete.getIfExists()) { |
| + throw new AlgebricksException("There is no dataverse with this name " + dvName + "."); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + return; |
| } |
| - } else { |
| + |
| + //#. prepare jobs which will drop corresponding datasets with indexes. |
| List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName); |
| for (int j = 0; j < datasets.size(); j++) { |
| String datasetName = datasets.get(j).getDatasetName(); |
| DatasetType dsType = datasets.get(j).getDatasetType(); |
| if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) { |
| + |
| List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName); |
| for (int k = 0; k < indexes.size(); k++) { |
| if (indexes.get(k).isSecondaryIndex()) { |
| - compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(), |
| - metadataProvider); |
| + CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName, |
| + indexes.get(k).getIndexName()); |
| + jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| } |
| } |
| + |
| + CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName); |
| + jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); |
| } |
| - compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider); |
| } |
| |
| + //#. mark PendingDropOp on the dataverse record by |
| + // first, deleting the dataverse record from the DATAVERSE_DATASET |
| + // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET |
| MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName); |
| + MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(), |
| + IMetadataEntity.PENDING_DROP_OP)); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + for (JobSpecification jobSpec : jobsToExecute) { |
| + runJob(hcc, jobSpec); |
| + } |
| + |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + |
| + //#. finally, delete the dataverse. |
| + MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName); |
| if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) { |
| activeDefaultDataverse = null; |
| } |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - DropStatement stmtDelete = (DropStatement) stmt; |
| - String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| - } |
| - String datasetName = stmtDelete.getDatasetName().getValue(); |
| - Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| - if (ds == null) { |
| - if (!stmtDelete.getIfExists()) |
| - throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| - + dataverseName + "."); |
| - } else { |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + DropStatement stmtDelete = (DropStatement) stmt; |
| + String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| + } |
| + String datasetName = stmtDelete.getDatasetName().getValue(); |
| + |
| + Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| + if (ds == null) { |
| + if (!stmtDelete.getIfExists()) { |
| + throw new AlgebricksException("There is no dataset with this name " + datasetName |
| + + " in dataverse " + dataverseName + "."); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + return; |
| + } |
| + |
| if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| + |
| + //#. prepare jobs to drop the datatset and the indexes in NC |
| List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| for (int j = 0; j < indexes.size(); j++) { |
| - if (indexes.get(j).isPrimaryIndex()) { |
| - compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(), |
| - metadataProvider); |
| + if (indexes.get(j).isSecondaryIndex()) { |
| + CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, |
| + indexes.get(j).getIndexName()); |
| + jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| } |
| } |
| + CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); |
| + jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); |
| + |
| + //#. mark the existing dataset as PendingDropOp |
| + MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| + MetadataManager.INSTANCE.addDataset( |
| + mdTxnCtx, |
| + new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds |
| + .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP)); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + //#. run the jobs |
| + for (JobSpecification jobSpec : jobsToExecute) { |
| + runJob(hcc, jobSpec); |
| + } |
| + |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| } |
| - compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider); |
| + |
| + //#. finally, delete the dataset. |
| + MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; |
| - String datasetName = stmtIndexDrop.getDatasetName().getValue(); |
| - String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue(); |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; |
| + String datasetName = stmtIndexDrop.getDatasetName().getValue(); |
| + String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue(); |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| + } |
| + |
| + Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| + if (ds == null) { |
| + throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| + + dataverseName); |
| + } |
| + |
| + if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| + String indexName = stmtIndexDrop.getIndexName().getValue(); |
| + Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| + if (index == null) { |
| + if (!stmtIndexDrop.getIfExists()) { |
| + throw new AlgebricksException("There is no index with this name " + indexName + "."); |
| + } |
| + } else { |
| + //#. prepare a job to drop the index in NC. |
| + CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, |
| + indexName); |
| + jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| + |
| + //#. mark PendingDropOp on the existing index |
| + MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| + MetadataManager.INSTANCE.addIndex( |
| + mdTxnCtx, |
| + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index |
| + .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); |
| + |
| + //#. commit the existing transaction before calling runJob. |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + for (JobSpecification jobSpec : jobsToExecute) { |
| + runJob(hcc, jobSpec); |
| + } |
| + |
| + //#. begin a new transaction |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + |
| + //#. finally, delete the existing index |
| + MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| + } |
| + } else { |
| + throw new AlgebricksException(datasetName |
| + + " is an external dataset. Indexes are not maintained for external datasets."); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| - Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| - if (ds == null) |
| - throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| - + dataverseName); |
| - if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| - String indexName = stmtIndexDrop.getIndexName().getValue(); |
| - Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| - if (idx == null) { |
| - if (!stmtIndexDrop.getIfExists()) |
| - throw new AlgebricksException("There is no index with this name " + indexName + "."); |
| - } else |
| - compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider); |
| - } else { |
| - throw new AlgebricksException(datasetName |
| - + " is an external dataset. Indexes are not maintained for external datasets."); |
| - } |
| } |
| |
| private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException, |
| ACIDException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt; |
| - String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue(); |
| - if (dataverseName == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt; |
| + String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue(); |
| + if (dataverseName == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| + } |
| + String typeName = stmtTypeDrop.getTypeName().getValue(); |
| + Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| + if (dt == null) { |
| + if (!stmtTypeDrop.getIfExists()) |
| + throw new AlgebricksException("There is no datatype with this name " + typeName + "."); |
| + } else { |
| + MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| - String typeName = stmtTypeDrop.getTypeName().getValue(); |
| - Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| - if (dt == null) { |
| - if (!stmtTypeDrop.getIfExists()) |
| - throw new AlgebricksException("There is no datatype with this name " + typeName + "."); |
| - } else { |
| - MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName); |
| - } |
| } |
| |
| private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| ACIDException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; |
| - String nodegroupName = stmtDelete.getNodeGroupName().getValue(); |
| - NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); |
| - if (ng == null) { |
| - if (!stmtDelete.getIfExists()) |
| - throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + "."); |
| - } else { |
| - MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; |
| + String nodegroupName = stmtDelete.getNodeGroupName().getValue(); |
| + NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); |
| + if (ng == null) { |
| + if (!stmtDelete.getIfExists()) |
| + throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + "."); |
| + } else { |
| + MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName); |
| + } |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException, |
| ACIDException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; |
| - String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace(); |
| - if (dataverse == null) { |
| - throw new AlgebricksException(" dataverse not specified "); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; |
| + String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace(); |
| + if (dataverse == null) { |
| + throw new AlgebricksException(" dataverse not specified "); |
| + } |
| + Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); |
| + if (dv == null) { |
| + throw new AlgebricksException("There is no dataverse with this name " + dataverse + "."); |
| + } |
| + Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction() |
| + .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), |
| + Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString()); |
| + MetadataManager.INSTANCE.addFunction(mdTxnCtx, function); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); |
| - if (dv == null) { |
| - throw new AlgebricksException("There is no dataverse with this name " + dataverse + "."); |
| - } |
| - Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction() |
| - .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), |
| - Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString()); |
| - MetadataManager.INSTANCE.addFunction(mdTxnCtx, function); |
| } |
| |
| private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException, |
| AlgebricksException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt; |
| - FunctionSignature signature = stmtDropFunction.getFunctionSignature(); |
| - Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); |
| - if (function == null) { |
| - if (!stmtDropFunction.getIfExists()) |
| - throw new AlgebricksException("Unknonw function " + signature); |
| - } else { |
| - MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt; |
| + FunctionSignature signature = stmtDropFunction.getFunctionSignature(); |
| + Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); |
| + if (function == null) { |
| + if (!stmtDropFunction.getIfExists()) |
| + throw new AlgebricksException("Unknonw function " + signature); |
| + } else { |
| + MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); |
| + } |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt; |
| - String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue(); |
| - CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName() |
| - .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); |
| |
| - IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName); |
| - Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format); |
| - jobsToExecute.add(job.getJobSpec()); |
| - // Also load the dataset's secondary indexes. |
| - List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt |
| - .getDatasetName().getValue()); |
| - for (Index index : datasetIndexes) { |
| - if (!index.isSecondaryIndex()) { |
| - continue; |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt; |
| + String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue(); |
| + CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt |
| + .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), |
| + loadStmt.dataIsAlreadySorted()); |
| + |
| + IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName); |
| + Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format); |
| + jobsToExecute.add(job.getJobSpec()); |
| + // Also load the dataset's secondary indexes. |
| + List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt |
| + .getDatasetName().getValue()); |
| + for (Index index : datasetIndexes) { |
| + if (!index.isSecondaryIndex()) { |
| + continue; |
| + } |
| + // Create CompiledCreateIndexStatement from metadata entity 'index'. |
| + CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), |
| + dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), |
| + index.getIndexType()); |
| + jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider)); |
| } |
| - // Create CompiledCreateIndexStatement from metadata entity 'index'. |
| - CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, |
| - index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| - jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider)); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + for (JobSpecification jobspec : jobsToExecute) { |
| + runJob(hcc, jobspec); |
| + } |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| } |
| |
| private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - metadataProvider.setWriteTransaction(true); |
| - WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt; |
| - String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue(); |
| - CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1 |
| - .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter()); |
| |
| - Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| - if (compiled.first != null) { |
| - jobsToExecute.add(compiled.first); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + metadataProvider.setWriteTransaction(true); |
| + WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt; |
| + String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue(); |
| + CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1 |
| + .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter()); |
| + |
| + Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| + clfrqs); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + if (compiled.first != null) { |
| + runJob(hcc, compiled.first); |
| + } |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| } |
| |
| private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - metadataProvider.setWriteTransaction(true); |
| - InsertStatement stmtInsert = (InsertStatement) stmt; |
| - String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue(); |
| - CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName() |
| - .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter()); |
| - Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| - if (compiled.first != null) { |
| - jobsToExecute.add(compiled.first); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + metadataProvider.setWriteTransaction(true); |
| + InsertStatement stmtInsert = (InsertStatement) stmt; |
| + String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue(); |
| + CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName() |
| + .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter()); |
| + Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| + clfrqs); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + if (compiled.first != null) { |
| + runJob(hcc, compiled.first); |
| + } |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| } |
| |
| private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - metadataProvider.setWriteTransaction(true); |
| - DeleteStatement stmtDelete = (DeleteStatement) stmt; |
| - String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| - CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, |
| - stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(), |
| - stmtDelete.getVarCounter(), metadataProvider); |
| - Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| - if (compiled.first != null) { |
| - jobsToExecute.add(compiled.first); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + metadataProvider.setWriteTransaction(true); |
| + DeleteStatement stmtDelete = (DeleteStatement) stmt; |
| + String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| + CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, |
| + stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(), |
| + stmtDelete.getVarCounter(), metadataProvider); |
| + Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| + clfrqs); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + if (compiled.first != null) { |
| + runJob(hcc, compiled.first); |
| + } |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| } |
| |
| @@ -704,46 +1106,109 @@ |
| |
| private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - BeginFeedStatement bfs = (BeginFeedStatement) stmt; |
| - String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue(); |
| |
| - CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, |
| - bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter()); |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| |
| - Dataset dataset; |
| - dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs |
| - .getDatasetName().getValue()); |
| - IDatasetDetails datasetDetails = dataset.getDatasetDetails(); |
| - if (datasetDetails.getDatasetType() != DatasetType.FEED) { |
| - throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset"); |
| + try { |
| + BeginFeedStatement bfs = (BeginFeedStatement) stmt; |
| + String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue(); |
| + |
| + CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName() |
| + .getValue(), bfs.getQuery(), bfs.getVarCounter()); |
| + |
| + Dataset dataset; |
| + dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs |
| + .getDatasetName().getValue()); |
| + IDatasetDetails datasetDetails = dataset.getDatasetDetails(); |
| + if (datasetDetails.getDatasetType() != DatasetType.FEED) { |
| + throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() |
| + + " is not a feed dataset"); |
| + } |
| + bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset); |
| + cbfs.setQuery(bfs.getQuery()); |
| + Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + if (compiled.first != null) { |
| + runJob(hcc, compiled.first); |
| + } |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| - bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset); |
| - cbfs.setQuery(bfs.getQuery()); |
| - Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs); |
| - if (compiled.first != null) { |
| - jobsToExecute.add(compiled.first); |
| - } |
| } |
| |
| private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| - ControlFeedStatement cfs = (ControlFeedStatement) stmt; |
| - String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| - : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue(); |
| - CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName, |
| - cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams()); |
| - jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider)); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + ControlFeedStatement cfs = (ControlFeedStatement) stmt; |
| + String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| + : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue(); |
| + CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), |
| + dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams()); |
| + JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider); |
| + |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + runJob(hcc, jobSpec); |
| + |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| + } |
| } |
| |
| private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, |
| List<JobSpecification> jobsToExecute) throws Exception { |
| - Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null); |
| - if (compiled.first != null) { |
| - GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1)); |
| - jobsToExecute.add(compiled.first); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + boolean bActiveTxn = true; |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireReadLatch(); |
| + |
| + try { |
| + Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null); |
| + |
| + QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath()); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + bActiveTxn = false; |
| + |
| + if (compiled.first != null) { |
| + GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1)); |
| + runJob(hcc, compiled.first); |
| + } |
| + |
| + return queryResult; |
| + } catch (Exception e) { |
| + if (bActiveTxn) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + } |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseReadLatch(); |
| } |
| - return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath()); |
| } |
| |
| private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex, |
| @@ -768,20 +1233,32 @@ |
| private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| ACIDException { |
| - MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| - NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; |
| - String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); |
| - NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); |
| - if (ng != null) { |
| - if (!stmtCreateNodegroup.getIfNotExists()) |
| - throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists."); |
| - } else { |
| - List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames(); |
| - List<String> ncNames = new ArrayList<String>(ncIdentifiers.size()); |
| - for (Identifier id : ncIdentifiers) { |
| - ncNames.add(id.getValue()); |
| + |
| + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + acquireWriteLatch(); |
| + |
| + try { |
| + NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; |
| + String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); |
| + NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); |
| + if (ng != null) { |
| + if (!stmtCreateNodegroup.getIfNotExists()) |
| + throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists."); |
| + } else { |
| + List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames(); |
| + List<String> ncNames = new ArrayList<String>(ncIdentifiers.size()); |
| + for (Identifier id : ncIdentifiers) { |
| + ncNames.add(id.getValue()); |
| + } |
| + MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames)); |
| } |
| - MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames)); |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + } catch (Exception e) { |
| + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| + throw new AlgebricksException(e); |
| + } finally { |
| + releaseWriteLatch(); |
| } |
| } |
| |
| @@ -791,10 +1268,37 @@ |
| |
| private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName, |
| String indexName, AqlMetadataProvider metadataProvider) throws Exception { |
| + MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| + Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| + |
| + //#. mark PendingDropOp on the existing index |
| + MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| + MetadataManager.INSTANCE.addIndex( |
| + mdTxnCtx, |
| + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index |
| + .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); |
| CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); |
| - runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, |
| - indexName); |
| + JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider); |
| + |
| + //#. commit the existing transaction before calling runJob. |
| + // the caller should begin the transaction before calling this function. |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + |
| + try { |
| + runJob(hcc, jobSpec); |
| + } catch (Exception e) { |
| + //need to create the mdTxnCtx to be aborted by caller properly |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + throw e; |
| + } |
| + |
| + //#. begin a new transaction |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + |
| + //#. finally, delete the existing index |
| + MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| } |
| |
| private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName, |
| @@ -803,10 +1307,32 @@ |
| CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); |
| Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| - JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); |
| - for (JobSpecification spec : jobSpecs) |
| - runJob(hcc, spec); |
| + JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); |
| + |
| + //#. mark PendingDropOp on the existing dataset |
| + MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| + MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(), |
| + ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP)); |
| + |
| + //#. commit the transaction |
| + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| + |
| + //#. run the job |
| + try { |
| + runJob(hcc, jobSpec); |
| + } catch (Exception e) { |
| + //need to create the mdTxnCtx to be aborted by caller properly |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + throw e; |
| + } |
| + |
| + //#. start a new transaction |
| + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| + metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| } |
| + |
| + //#. finally, delete the existing dataset. |
| MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| } |
| |
| @@ -831,4 +1357,20 @@ |
| } |
| return format; |
| } |
| + |
| + private void acquireWriteLatch() { |
| + cacheLatch.writeLock().lock(); |
| + } |
| + |
| + private void releaseWriteLatch() { |
| + cacheLatch.writeLock().unlock(); |
| + } |
| + |
| + private void acquireReadLatch() { |
| + cacheLatch.readLock().lock(); |
| + } |
| + |
| + private void releaseReadLatch() { |
| + cacheLatch.readLock().unlock(); |
| + } |
| } |