added GroupCommit
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1218 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/diff_file b/diff_file
index 8efd61f..e0e6714 100644
--- a/diff_file
+++ b/diff_file
@@ -1,2098 +1,4486 @@
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
-@@ -25,8 +25,11 @@
- import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
- import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
- import edu.uci.ics.asterix.metadata.entities.Dataverse;
-+import edu.uci.ics.asterix.om.base.AInt32;
-+import edu.uci.ics.asterix.om.base.AMutableInt32;
- import edu.uci.ics.asterix.om.base.ARecord;
- import edu.uci.ics.asterix.om.base.AString;
-+import edu.uci.ics.asterix.om.types.BuiltinType;
- import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
- import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-@@ -40,12 +43,18 @@
- // Payload field containing serialized Dataverse.
- public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
-
-+ private AMutableInt32 aInt32;
-+ protected ISerializerDeserializer<AInt32> aInt32Serde;
-+
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
-
-+ @SuppressWarnings("unchecked")
- public DataverseTupleTranslator(boolean getTuple) {
- super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
-+ aInt32 = new AMutableInt32(-1);
-+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- }
-
- @Override
-@@ -57,7 +66,8 @@
- DataInput in = new DataInputStream(stream);
- ARecord dataverseRecord = recordSerDes.deserialize(in);
- return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
-- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
-+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
-+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
- }
-
- @Override
-@@ -88,6 +98,12 @@
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
-+ // write field 3
-+ fieldValue.reset();
-+ aInt32.setValue(instance.getPendingOp());
-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- tupleBuilder.addFieldEndOffset();
-
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
-@@ -77,9 +77,9 @@
- protected ISerializerDeserializer<AInt32> aInt32Serde;
-
- @SuppressWarnings("unchecked")
-- public DatasetTupleTranslator(boolean getTuple) {
-+ public DatasetTupleTranslator(boolean getTuple) {
- super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
-- aInt32 = new AMutableInt32(-1);
-+ aInt32 = new AMutableInt32(-1);
- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- }
-
-@@ -104,8 +104,10 @@
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
- DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
- IDatasetDetails datasetDetails = null;
-- int datasetId = ((AInt32) datasetRecord
-+ int datasetId = ((AInt32) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
-+ int pendingOp = ((AInt32) datasetRecord
-+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
- switch (datasetType) {
- case FEED:
- case INTERNAL: {
-@@ -197,7 +199,7 @@
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
+@@ -103,12 +103,14 @@
+ //for entity-level commit
+ if (PKHashVal != -1) {
+ transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
++ /*****************************
+ try {
+ //decrease the transaction reference count on index
+ txnContext.decreaseActiveTransactionCountOnIndexes();
+ } catch (HyracksDataException e) {
+ throw new ACIDException("failed to complete index operation", e);
}
- datasetDetails = new ExternalDatasetDetails(adapter, properties);
- }
-- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
-+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
- }
++ *****************************/
+ return;
+ }
- @Override
-@@ -248,13 +250,19 @@
- aString.setValue(Calendar.getInstance().getTime().toString());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
--
-+
- // write field 8
- fieldValue.reset();
- aInt32.setValue(dataset.getDatasetId());
- aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
--
-+
-+ // write field 9
-+ fieldValue.reset();
-+ aInt32.setValue(dataset.getPendingOp());
-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
- // write record
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- tupleBuilder.addFieldEndOffset();
-@@ -290,13 +298,15 @@
- fieldValue.reset();
- aString.setValue(name);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
-+ fieldValue);
-
- // write field 1
- fieldValue.reset();
- aString.setValue(value);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
-+ fieldValue);
-
- propertyRecordBuilder.write(out, true);
- }
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
-@@ -96,13 +96,15 @@
- }
- Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
- .getBoolean();
-+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
-+ .getIntegerValue();
- // Check if there is a gram length as well.
- int gramLength = -1;
- int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
- if (gramLenPos >= 0) {
- gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
- }
-- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
-+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
- }
-
- @Override
-@@ -174,7 +176,12 @@
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
-- // write optional field 7
-+ // write field 7
-+ fieldValue.reset();
-+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
-+ // write optional field 8
- if (instance.getGramLength() > 0) {
- fieldValue.reset();
- nameValue.reset();
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
-@@ -129,7 +129,7 @@
-
- public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
- private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
-- private final MetadataTransactionContext mdTxnCtx;
-+ private MetadataTransactionContext mdTxnCtx;
- private boolean isWriteTransaction;
- private Map<String, String[]> stores;
- private Map<String, String> config;
-@@ -156,8 +156,7 @@
- return config;
- }
-
-- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
-- this.mdTxnCtx = mdTxnCtx;
-+ public AqlMetadataProvider(Dataverse defaultDataverse) {
- this.defaultDataverse = defaultDataverse;
- this.stores = AsterixProperties.INSTANCE.getStores();
- }
-@@ -181,6 +180,10 @@
- public void setWriterFactory(IAWriterFactory writerFactory) {
- this.writerFactory = writerFactory;
- }
-+
-+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
-+ this.mdTxnCtx = mdTxnCtx;
-+ }
-
- public MetadataTransactionContext getMetadataTxnContext() {
- return mdTxnCtx;
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
-@@ -35,15 +35,18 @@
- private final DatasetType datasetType;
- private IDatasetDetails datasetDetails;
- private final int datasetId;
-+ // Type of pending operations with respect to atomic DDL operation
-+ private final int pendingOp;
-
- public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
-- DatasetType datasetType, int datasetId) {
-+ DatasetType datasetType, int datasetId, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.itemTypeName = itemTypeName;
- this.datasetType = datasetType;
- this.datasetDetails = datasetDetails;
- this.datasetId = datasetId;
-+ this.pendingOp = pendingOp;
- }
-
- public String getDataverseName() {
-@@ -73,6 +76,10 @@
- public int getDatasetId() {
- return datasetId;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
-
- @Override
- public Object addToCache(MetadataCache cache) {
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
-@@ -45,9 +45,11 @@
- private final boolean isPrimaryIndex;
- // Specific to NGRAM indexes.
- private final int gramLength;
-+ // Type of pending operations with respect to atomic DDL operation
-+ private final int pendingOp;
-
- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
-+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.indexName = indexName;
-@@ -55,10 +57,11 @@
- this.keyFieldNames = keyFieldNames;
- this.gramLength = gramLength;
- this.isPrimaryIndex = isPrimaryIndex;
-+ this.pendingOp = pendingOp;
- }
-
- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-- List<String> keyFieldNames, boolean isPrimaryIndex) {
-+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.indexName = indexName;
-@@ -66,6 +69,7 @@
- this.keyFieldNames = keyFieldNames;
- this.gramLength = -1;
- this.isPrimaryIndex = isPrimaryIndex;
-+ this.pendingOp = pendingOp;
- }
-
- public String getDataverseName() {
-@@ -95,6 +99,10 @@
- public boolean isPrimaryIndex() {
- return isPrimaryIndex;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
-
- public boolean isSecondaryIndex() {
- return !isPrimaryIndex();
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
-@@ -27,10 +27,12 @@
- // Enforced to be unique within an Asterix cluster..
- private final String dataverseName;
- private final String dataFormat;
-+ private final int pendingOp;
-
-- public Dataverse(String dataverseName, String format) {
-+ public Dataverse(String dataverseName, String format, int pendingOp) {
- this.dataverseName = dataverseName;
- this.dataFormat = format;
-+ this.pendingOp = pendingOp;
- }
-
- public String getDataverseName() {
-@@ -40,6 +42,10 @@
- public String getDataFormat() {
- return dataFormat;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
-
- @Override
- public Object addToCache(MetadataCache cache) {
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
-@@ -25,6 +25,7 @@
- import edu.uci.ics.asterix.common.exceptions.AsterixException;
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
- import edu.uci.ics.asterix.metadata.api.IMetadataNode;
- import edu.uci.ics.asterix.metadata.api.IValueExtractor;
-@@ -160,7 +161,7 @@
- // Add the primary index for the dataset.
- InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
- Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
-- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
-+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
- addIndex(jobId, primaryIndex);
- ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
- dataset.getDatasetName());
-@@ -260,7 +261,7 @@
- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
- // TODO: fix exceptions once new BTree exception model is in hyracks.
- indexAccessor.insert(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
-@@ -536,7 +537,7 @@
- // The transaction with txnId will have an S lock on the
- // resource. Note that lock converters have a higher priority than
- // regular waiters in the LockManager.
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
- indexAccessor.delete(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
- //check how to get the oldValue.
-@@ -803,7 +804,9 @@
- private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
- IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
-+ //#. currently lock is not needed to access any metadata
-+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
- IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
-@@ -20,6 +20,11 @@
- import edu.uci.ics.asterix.metadata.MetadataCache;
-
- public interface IMetadataEntity extends Serializable {
-+
-+ public static final int PENDING_NO_OP = 0;
-+ public static final int PENDING_ADD_OP = 1;
-+ public static final int PENDING_DROP_OP = 2;
-+
- Object addToCache(MetadataCache cache);
-
- Object dropFromCache(MetadataCache cache);
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
-@@ -17,6 +17,8 @@
-
- import java.rmi.RemoteException;
- import java.util.List;
-+import java.util.concurrent.locks.ReadWriteLock;
-+import java.util.concurrent.locks.ReentrantReadWriteLock;
-
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
-@@ -79,11 +81,10 @@
- public class MetadataManager implements IMetadataManager {
- // Set in init().
- public static MetadataManager INSTANCE;
--
- private final MetadataCache cache = new MetadataCache();
- private IAsterixStateProxy proxy;
- private IMetadataNode metadataNode;
--
-+
- public MetadataManager(IAsterixStateProxy proxy) {
- if (proxy == null) {
- throw new Error("Null proxy given to MetadataManager.");
-@@ -206,11 +207,14 @@
-
- @Override
- public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
-+ // add dataset into metadataNode
- try {
- metadataNode.addDataset(ctx.getJobId(), dataset);
- } catch (RemoteException e) {
- throw new MetadataException(e);
- }
-+
-+ // reflect the dataset into the cache
- ctx.addDataset(dataset);
- }
-
-@@ -585,4 +589,5 @@
- }
- return adapter;
- }
-+
- }
-\ No newline at end of file
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
@@ -19,6 +19,7 @@
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
++import java.util.concurrent.atomic.AtomicInteger;
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
- import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
- import edu.uci.ics.asterix.metadata.entities.Datatype;
-@@ -104,19 +105,19 @@
- }
-
- public void dropDataset(String dataverseName, String datasetName) {
-- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
-+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addDatasetIfNotExists(dataset);
- logAndApply(new MetadataLogicalOperation(dataset, false));
- }
-
- public void dropIndex(String dataverseName, String datasetName, String indexName) {
-- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
-+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addIndexIfNotExists(index);
- logAndApply(new MetadataLogicalOperation(index, false));
- }
-
- public void dropDataverse(String dataverseName) {
-- Dataverse dataverse = new Dataverse(dataverseName, null);
-+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addDataverseIfNotExists(dataverse);
- logAndApply(new MetadataLogicalOperation(dataverse, false));
- }
-@@ -162,7 +163,7 @@
+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+ import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
+@@ -169,5 +170,14 @@
+ closeable.close(this);
}
- return droppedCache.getDataset(dataverseName, datasetName) != null;
}
--
++
++ @Override
++ public int hashCode() {
++ return jobId.getId();
++ }
+
++ @Override
++ public boolean equals(Object o) {
++ return (o == this);
++ }
+ }
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
+@@ -567,7 +567,7 @@
+ if (commitFlag) {
+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+ try {
+- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
++ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
+ entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
+ } catch (ACIDException e) {
+ try {
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
+@@ -75,6 +75,7 @@
+ buffer.position(0);
+ buffer.limit(size);
+ fileChannel.write(buffer);
++ fileChannel.force(false);
+ erase();
+ }
+
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
+@@ -1,5 +1,5 @@
+ /*
+- * Copyright 2009-2010 by The Regents of the University of California
++ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+@@ -21,7 +21,12 @@
+ import java.io.RandomAccessFile;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.FileChannel;
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
+ import java.util.Properties;
++import java.util.Set;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+@@ -30,22 +35,25 @@
+
+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+ import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
++import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+ public class LogManager implements ILogManager {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
+ private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
+- private TransactionSubsystem provider;
++ private final TransactionSubsystem provider;
+ private LogManagerProperties logManagerProperties;
++ private LogPageFlushThread logPageFlusher;
+
+ /*
+ * the array of log pages. The number of log pages is configurable. Pages
+ * taken together form an in-memory log buffer.
+ */
+-
+ private IFileBasedBuffer[] logPages;
+
+ private ILogRecordHelper logRecordHelper;
+@@ -54,6 +62,7 @@
+ * Number of log pages that constitute the in-memory log buffer.
+ */
+ private int numLogPages;
+
- public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
- if (droppedCache.getDataverse(dataverseName) != null) {
- return true;
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
-@@ -80,10 +80,11 @@
- public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
- public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
- public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
-+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
+ /*
+ * Initially all pages have an owner count of 1 that is the LogManager. When
+ * a transaction requests to write in a log page, the owner count is
+@@ -62,12 +71,11 @@
+ * (covering the whole log record). When the content has been put, the log
+ * manager computes the checksum and puts it after the content. At this
+ * point, the ownership count is decremented as the transaction is done with
+- * using the page. When a page is full, the log manager decrements the count
+- * by one indicating that it has released its ownership of the log page.
+- * There could be other transaction(s) still owning the page (that is they
+- * could still be mid-way putting the log content). When the ownership count
+- * eventually reaches zero, the thread responsible for flushing the log page
+- * is notified and the page is flushed to disk.
++ * using the page. When a page is requested to be flushed, logPageFlusher
++ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
++ * only if the count is 1(LOG_WRITER: meaning that there is no other
++ * transactions who own the page to write logs.) After flushing the page,
++ * logPageFlusher set this count to 1.
+ */
+ private AtomicInteger[] logPageOwnerCount;
- private static final ARecordType createDataverseRecordType() {
-- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
-- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
-+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
-+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
+@@ -78,18 +86,16 @@
+
+ /*
+ * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
+- * page is maintained in a map called logPageStatus. A page is ACTIVE when
+- * the LogManager can allocate space in the page for writing a log record.
+- * Initially all pages are ACTIVE. As transactions fill up space by writing
+- * log records, a page may not have sufficient space left for serving a
+- * request by a transaction. When this happens, the page is marked INACTIVE.
+- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
+- * 0) indicates that the page must be flushed to disk before any other log
+- * record is written on the page.F
++ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
++ * can allocate space in the page for writing a log record. Initially all
++ * pages are ACTIVE. As transactions fill up space by writing log records,
++ * a page may not have sufficient space left for serving a request by a
++ * transaction. When this happens, the page is flushed to disk by calling
++ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
++ * the page status is set to INACTIVE. Then, there is no more writer on the
++ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
++ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
+ */
+-
+- // private Map<Integer, Integer> logPageStatus = new
+- // ConcurrentHashMap<Integer, Integer>();
+ private AtomicInteger[] logPageStatus;
+
+ static class PageState {
+@@ -98,41 +104,8 @@
}
- // Helper constants for accessing fields in an ARecord of anonymous type
-@@ -158,10 +159,11 @@
- public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
- public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
- public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
-+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
+ private AtomicLong lastFlushedLsn = new AtomicLong(-1);
+- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
- private static final ARecordType createDatasetRecordType() {
- String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
-- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
-+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
+ /*
+- * pendingFlushRequests is a map with key as Integer denoting the page
+- * index. When a (transaction) thread discovers the need to flush a page, it
+- * puts its Thread object into the corresponding value that is a
+- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
+- * this map in order of page index (and circling around). The flusher thread
+- * needs to flush pages in order and waits for a thread to deposit an object
+- * in the blocking queue corresponding to the next page in order. A request
+- * to flush a page is conveyed to the flush thread by simply depositing an
+- * object in to corresponding blocking queue. It is blocking in the sense
+- * that the flusher thread will continue to wait for an object to arrive in
+- * the queue. The object itself is ignored by the fliusher and just acts as
+- * a signal/event that a page needs to be flushed.
+- */
+-
+- private LinkedBlockingQueue[] pendingFlushRequests;
+-
+- /*
+- * ICommitResolver is an interface that provides an API that can answer a
+- * simple boolean - Given the commit requests so far, should a page be
+- * flushed. The implementation of the interface contains the logic (or you
+- * can say the policy) for commit. It could be group commit in which case
+- * the commit resolver may not return a true indicating that it wishes to
+- * delay flushing of the page.
+- */
+- private ICommitResolver commitResolver;
+-
+- /*
+- * An object that keeps track of the submitted commit requests.
+- */
+- private CommitRequestStatistics commitRequestStatistics;
+-
+- /*
+ * When the transaction eco-system comes to life, the log manager positions
+ * itself to the end of the last written log. the startingLsn represent the
+ * lsn value of the next log record to be written after a system (re)start.
+@@ -146,16 +119,10 @@
+ */
+ private AtomicLong lsn = new AtomicLong(0);
- List<IAType> internalRecordUnionList = new ArrayList<IAType>();
- internalRecordUnionList.add(BuiltinType.ANULL);
-@@ -179,7 +181,8 @@
- AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
+- /*
+- * A map that tracks the flush requests submitted for each page. The
+- * requests for a page are cleared when the page is flushed.
+- */
+- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
+- return pendingFlushRequests[pageIndex];
+- }
++ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
-+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
-+ BuiltinType.AINT32 };
- return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
+- public void addFlushRequest(int pageIndex) {
+- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
++ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
++ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
}
-@@ -264,13 +267,14 @@
- public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
- public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
- public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
-+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
-
- private static final ARecordType createIndexRecordType() {
- AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
- String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
-- "IsPrimary", "Timestamp" };
-+ "IsPrimary", "Timestamp", "PendingOp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
-+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
- return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
- };
-
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
-@@ -31,6 +31,7 @@
- import edu.uci.ics.asterix.metadata.IDatasetDetails;
- import edu.uci.ics.asterix.metadata.MetadataManager;
- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
- import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
-@@ -226,7 +227,7 @@
- public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
- String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
- String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
-- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
- }
-
- public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
-@@ -236,7 +237,7 @@
- primaryIndexes[i].getNodeGroupName());
- MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
- primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
-- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
-+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
+ public AtomicLong getLastFlushedLsn() {
+@@ -233,19 +200,12 @@
+ numLogPages = logManagerProperties.getNumLogPages();
+ logPageOwnerCount = new AtomicInteger[numLogPages];
+ logPageStatus = new AtomicInteger[numLogPages];
+- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
+- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
+- // the
+- // Commit
+- // Resolver
+- commitResolver = new GroupCommitResolver(); // Group Commit is
+- // enabled
+- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
+- } else {
+- commitResolver = new BasicCommitResolver(); // the basic commit
+- // resolver
++
++ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
++ for (int i = 0; i < numLogPages; i++) {
++ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
}
- }
+- this.commitResolver.init(this); // initialize the commit resolver
++
+ logPages = new FileBasedBuffer[numLogPages];
-@@ -267,7 +268,7 @@
- for (int i = 0; i < secondaryIndexes.length; i++) {
- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
- secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
-- secondaryIndexes[i].getPartitioningExpr(), false));
-+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
+ /*
+@@ -264,7 +224,6 @@
+ for (int i = 0; i < numLogPages; i++) {
+ logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
+ logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
+- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
}
+
+ /*
+@@ -278,9 +237,9 @@
+ * daemon thread so that it does not stop the JVM from exiting when all
+ * other threads are done with their work.
+ */
+- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
+- logFlusher.setDaemon(true);
+- logFlusher.start();
++ logPageFlusher = new LogPageFlushThread(this);
++ logPageFlusher.setDaemon(true);
++ logPageFlusher.start();
}
-Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
-===================================================================
---- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
-+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
-@@ -95,11 +95,11 @@
- File testFile = tcCtx.getTestFile(cUnit);
+ public int getLogPageIndex(long lsnValue) {
+@@ -312,7 +271,7 @@
+ */
+ private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
+ if (logPageStatus[pageIndex].get() == PageState.ACTIVE
+- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
++ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
+ return;
+ }
+ try {
+@@ -338,47 +297,40 @@
+ */
+ private long getLsn(int entrySize, byte logType) throws ACIDException {
+ long pageSize = logManagerProperties.getLogPageSize();
+- boolean requiresFlushing = logType == LogType.COMMIT;
++
+ while (true) {
+ boolean forwardPage = false;
+- boolean shouldFlushPage = false;
+ long old = lsn.get();
+- int pageIndex = getLogPageIndex(old); // get the log page
+- // corresponding to the
+- // current lsn value
++
++ //get the log page corresponding to the current lsn value
++ int pageIndex = getLogPageIndex(old);
+ long retVal = old;
+- long next = old + entrySize; // the lsn value for the next request,
+- // if the current request is served.
++
++ // the lsn value for the next request if the current request is served.
++ long next = old + entrySize;
+ int prevPage = -1;
+- if ((next - 1) / pageSize != old / pageSize // check if the log
+- // record will cross
+- // page boundaries, a
+- // case that is not
+- // allowed.
+- || (next % pageSize == 0)) {
++
++ // check if the log record will cross page boundaries, a case that is not allowed.
++ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
++
+ if ((old != 0 && old % pageSize == 0)) {
+- retVal = old; // On second thought, this shall never be the
+- // case as it means that the lsn is
+- // currently at the beginning of a page and
+- // we still need to forward the page which
+- // means that the entrySize exceeds a log
+- // page size. If this is the case, an
+- // exception is thrown before calling this
+- // API.
+- // would remove this case.
++ // On second thought, this shall never be the case as it means that the lsn is
++ // currently at the beginning of a page and we still need to forward the page which
++ // means that the entrySize exceeds a log page size. If this is the case, an
++ // exception is thrown before calling this API. would remove this case.
++ retVal = old;
- /*************** to avoid run failure cases ****************
-- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
-+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
+ } else {
+- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
+- // to point to
+- // the beginning
+- // of the next
+- // page.
++ // set the lsn to point to the beginning of the next page.
++ retVal = ((old / pageSize) + 1) * pageSize;
+ }
++
+ next = retVal;
+- forwardPage = true; // as the log record shall cross log page
+- // boundary, we must re-assign the lsn (so
+- // that the log record begins on a different
+- // location.
++
++ // as the log record shall cross log page boundary, we must re-assign the lsn so
++ // that the log record begins on a different location.
++ forwardPage = true;
++
+ prevPage = pageIndex;
+ pageIndex = getNextPageInSequence(pageIndex);
+ }
+@@ -397,109 +349,51 @@
+ */
+ waitUntillPageIsAvailableForWritingLog(pageIndex);
+
+- if (!forwardPage && requiresFlushing) {
+- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
+- if (shouldFlushPage) {
+- next = ((next / pageSize) + 1) * pageSize; /*
+- * next
+- * represents the
+- * next value of
+- * lsn after this
+- * log record has
+- * been written.
+- * If the page
+- * needs to be
+- * flushed, then
+- * we do not give
+- * any more LSNs
+- * from this
+- * page.
+- */
+- }
+- }
+- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
+- // only when the value
+- // represented by lsn is same as
+- // "old". The value is updated
+- // to "next".
++ if (!lsn.compareAndSet(old, next)) {
++ // Atomic call -> returns true only when the value represented by lsn is same as
++ // "old". The value is updated to "next".
continue;
}
- ************************************************************/
+
+ if (forwardPage) {
+- //TODO
+- //this is not safe since the incoming thread may reach the same page slot with this page
+- //(differ by the log buffer size)
+- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
+- // previous
+- // page
+- // inactive
++ addFlushRequest(prevPage, old, false);
+
+- /*
+- * decrement on the behalf of the log manager. if there are no
+- * more owners (count == 0) the page must be marked as a
+- * candidate to be flushed.
+- */
+- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
+- if (pageDirtyCount == 0) {
+- addFlushRequest(prevPage);
+- }
-
+- /*
+- * The transaction thread that discovers the need to forward a
+- * page is made to re-acquire a lsn.
+- */
++ // The transaction thread that discovers the need to forward a
++ // page is made to re-acquire a lsn.
+ continue;
++
+ } else {
+- /*
+- * the transaction thread has been given a space in a log page,
+- * but is made to wait until the page is available.
+- */
++ // the transaction thread has been given a space in a log page,
++ // but is made to wait until the page is available.
++ // (Is this needed? when does this wait happen?)
+ waitUntillPageIsAvailableForWritingLog(pageIndex);
+- /*
+- * increment the counter as the transaction thread now holds a
+- * space in the log page and hence is an owner.
+- */
++
++ // increment the counter as the transaction thread now holds a
++ // space in the log page and hence is an owner.
+ logPageOwnerCount[pageIndex].incrementAndGet();
+- }
+- if (requiresFlushing) {
+- if (!shouldFlushPage) {
+- /*
+- * the log record requires the page to be flushed but under
+- * the commit policy, the flush task has been deferred. The
+- * transaction thread submits its request to flush the page.
+- */
+- commitRequestStatistics.registerCommitRequest(pageIndex);
+- } else {
+- /*
+- * the flush request was approved by the commit resolver.
+- * Thus the page is marked INACTIVE as no more logs will be
+- * written on this page. The log manager needs to release
+- * its ownership. Note that transaction threads may still
+- * continue to be owners of the log page till they fill up
+- * the space allocated to them.
+- */
+- logPageStatus[pageIndex].set(PageState.INACTIVE);
+- logPageOwnerCount[pageIndex].decrementAndGet(); // on
+- // the
+- // behalf
+- // of
+- // log
+- // manager
++
++ // Before the count is incremented, if the flusher flushed the allocated page,
++ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
++ if (lastFlushedLsn.get() >= retVal) {
++ logPageOwnerCount[pageIndex].decrementAndGet();
++ continue;
+ }
+ }
++
+ return retVal;
+ }
+ }
+
+ @Override
+- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
++ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+ LogicalLogLocator logicalLogLocator) throws ACIDException {
+- /*
+- * logLocator is a re-usable object that is appropriately set in each
+- * invocation. If the reference is null, the log manager must throw an
+- * exception
+- */
++
++ HashMap<TransactionContext, Integer> map = null;
++ int activeTxnCount;
++
++ // logLocator is a re-usable object that is appropriately set in each invocation.
++ // If the reference is null, the log manager must throw an exception.
+ if (logicalLogLocator == null) {
+ throw new ACIDException(
+ " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
+@@ -519,20 +413,19 @@
+
+ // all constraints checked and we are good to go and acquire a lsn.
+ long previousLSN = -1;
+- long currentLSN; // the will be set to the location (a long value)
+- // where the log record needs to be placed.
+
+- /*
+- * The logs written by a transaction need to be linked to each other for
+- * a successful rollback/recovery. However there could be multiple
+- * threads operating concurrently that are part of a common transaction.
+- * These threads need to synchronize and record the lsn corresponding to
+- * the last log record written by (any thread of) the transaction.
+- */
+- synchronized (context) {
+- previousLSN = context.getLastLogLocator().getLsn();
++ // the will be set to the location (a long value) where the log record needs to be placed.
++ long currentLSN;
++
++ // The logs written by a transaction need to be linked to each other for
++ // a successful rollback/recovery. However there could be multiple
++ // threads operating concurrently that are part of a common transaction.
++ // These threads need to synchronize and record the lsn corresponding to
++ // the last log record written by (any thread of) the transaction.
++ synchronized (txnCtx) {
++ previousLSN = txnCtx.getLastLogLocator().getLsn();
+ currentLSN = getLsn(totalLogSize, logType);
+- context.setLastLSN(currentLSN);
++ txnCtx.setLastLSN(currentLSN);
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
+ }
+@@ -547,48 +440,37 @@
+ * performed correctly that is ownership is released.
+ */
+
+- boolean decremented = false; // indicates if the transaction thread
+- // has release ownership of the
+- // page.
+- boolean addedFlushRequest = false; // indicates if the transaction
+- // thread has submitted a flush
+- // request.
++ // indicates if the transaction thread has release ownership of the page.
++ boolean decremented = false;
+
+ int pageIndex = (int) getLogPageIndex(currentLSN);
+
+- /*
+- * the lsn has been obtained for the log record. need to set the
+- * LogLocator instance accordingly.
+- */
+-
++ // the lsn has been obtained for the log record. need to set the
++ // LogLocator instance accordingly.
+ try {
+-
+ logicalLogLocator.setBuffer(logPages[pageIndex]);
+ int pageOffset = getLogPageOffset(currentLSN);
+ logicalLogLocator.setMemoryOffset(pageOffset);
+
+- /*
+- * write the log header.
+- */
+- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
++ // write the log header.
++ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
+ resourceId, resourceMgrId, logContentSize);
+
+ // increment the offset so that the transaction can fill up the
+ // content in the correct region of the allocated space.
+ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
+
+- // a COMMIT log record does not have any content
+- // and hence the logger (responsible for putting the log content) is
+- // not invoked.
++ // a COMMIT log record does not have any content and hence
++ // the logger (responsible for putting the log content) is not invoked.
+ if (logContentSize != 0) {
+- logger.preLog(context, reusableLogContentObject);
++ logger.preLog(txnCtx, reusableLogContentObject);
+ }
+
+ if (logContentSize != 0) {
+ // call the logger implementation and ask to fill in the log
+ // record content at the allocated space.
+- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
+- logger.postLog(context, reusableLogContentObject);
++ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
++ logger.postLog(txnCtx, reusableLogContentObject);
+ if (IS_DEBUG_MODE) {
+ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
+ - logRecordHelper.getLogHeaderSize(logType));
+@@ -597,10 +479,8 @@
+ }
+ }
+
+- /*
+- * The log record has been written. For integrity checks, compute
+- * the checksum and put it at the end of the log record.
+- */
++ // The log record has been written. For integrity checks, compute
++ // the checksum and put it at the end of the log record.
+ int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
+ int length = totalLogSize - logRecordHelper.getLogChecksumSize();
+ long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
+@@ -611,46 +491,31 @@
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
+
+- /*
+- * release the ownership as the log record has been placed in
+- * created space.
+- */
+- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
++ // release the ownership as the log record has been placed in created space.
++ logPageOwnerCount[pageIndex].decrementAndGet();
+
+ // indicating that the transaction thread has released ownership
+ decremented = true;
+
+- /*
+- * If the transaction thread happens to be the last owner of the log
+- * page the page must by marked as a candidate to be flushed.
+- */
+- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
+- addFlushRequest(pageIndex);
+- addedFlushRequest = true;
+- }
+-
+- /*
+- * If the log type is commit, a flush request is registered, if the
+- * log record has not reached the disk. It may be possible that this
+- * thread does not get CPU cycles and in-between the log record has
+- * been flushed to disk because the containing log page filled up.
+- */
+- if (logType == LogType.COMMIT) {
+- synchronized (logPages[pageIndex]) {
+- while (getLastFlushedLsn().get() < currentLSN) {
+- logPages[pageIndex].wait();
+- }
++ if (logType == LogType.ENTITY_COMMIT) {
++ map = activeTxnCountMaps.get(pageIndex);
++ if (map.containsKey(txnCtx)) {
++ activeTxnCount = (Integer) map.get(txnCtx);
++ activeTxnCount++;
++ map.put(txnCtx, activeTxnCount);
++ } else {
++ map.put(txnCtx, 1);
+ }
++ addFlushRequest(pageIndex, currentLSN, false);
++ } else if (logType == LogType.COMMIT) {
++ addFlushRequest(pageIndex, currentLSN, true);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
++ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
+ + " logger encountered exception", e);
+ } finally {
+- /*
+- * If an exception was encountered and we did not release ownership
+- */
+ if (!decremented) {
+ logPageOwnerCount[pageIndex].decrementAndGet();
+ }
+@@ -667,9 +532,6 @@
+
+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
+ logManagerProperties.getLogPageSize());
+-
+- //TODO Check if this is necessary
+- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
+ }
+
+ @Override
+@@ -747,16 +609,13 @@
+ //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
+
+ byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+- // take a lock on the log page so that the page is not flushed to
+- // disk interim
++
++ // take a lock on the log page so that the page is not flushed to disk interim
+ synchronized (logPages[pageIndex]) {
+- if (lsnValue > getLastFlushedLsn().get()) { // need to check
+- // again
+- // (this
+- // thread may have got
+- // de-scheduled and must
+- // refresh!)
+
++ // need to check again (this thread may have got de-scheduled and must refresh!)
++ if (lsnValue > getLastFlushedLsn().get()) {
++
+ // get the log record length
+ logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
+ byte logType = pageContent[pageOffset + 4];
+@@ -765,9 +624,7 @@
+ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+ logRecord = new byte[logRecordSize];
+
+- /*
+- * copy the log record content
+- */
++ // copy the log record content
+ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
+ MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
+ if (logicalLogLocator == null) {
+@@ -790,9 +647,7 @@
+ }
+ }
+
+- /*
+- * the log record is residing on the disk, read it from there.
+- */
++ // the log record is residing on the disk, read it from there.
+ readDiskLog(lsnValue, logicalLogLocator);
+ }
+
+@@ -860,30 +715,40 @@
+ return logPageOwnerCount[pageIndex];
+ }
+
+- public ICommitResolver getCommitResolver() {
+- return commitResolver;
+- }
+-
+- public CommitRequestStatistics getCommitRequestStatistics() {
+- return commitRequestStatistics;
+- }
+-
+ public IFileBasedBuffer[] getLogPages() {
+ return logPages;
+ }
+
+- public int getLastFlushedPage() {
+- return lastFlushedPage.get();
+- }
+-
+- public void setLastFlushedPage(int lastFlushedPage) {
+- this.lastFlushedPage.set(lastFlushedPage);
+- }
+-
+ @Override
+ public TransactionSubsystem getTransactionSubsystem() {
+ return provider;
+ }
++
++ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
++ TransactionContext ctx = null;
++ int count = 0;
++ int i = 0;
++
++ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
++ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
++ if (entrySet != null) {
++ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
++ if (entry != null) {
++ if (entry.getValue() != null) {
++ count = entry.getValue();
++ }
++ if (count > 0) {
++ ctx = entry.getKey();
++ for (i = 0; i < count; i++) {
++ ctx.decreaseActiveTransactionCountOnIndexes();
++ }
++ }
++ }
++ }
++ }
++
++ map.clear();
++ }
+ }
+
+ /*
+@@ -895,36 +760,82 @@
+ class LogPageFlushThread extends Thread {
+
+ private LogManager logManager;
++ /*
++ * pendingFlushRequests is a map with key as Integer denoting the page
++ * index. When a (transaction) thread discovers the need to flush a page, it
++ * puts its Thread object into the corresponding value that is a
++ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
++ * this map in order of page index (and circling around). The flusher thread
++ * needs to flush pages in order and waits for a thread to deposit an object
++ * in the blocking queue corresponding to the next page in order. A request
++ * to flush a page is conveyed to the flush thread by simply depositing an
++ * object in to corresponding blocking queue. It is blocking in the sense
++ * that the flusher thread will continue to wait for an object to arrive in
++ * the queue. The object itself is ignored by the fliusher and just acts as
++ * a signal/event that a page needs to be flushed.
++ */
++ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
++ private final Object[] flushRequests;
++ private int lastFlushedPageIndex;
++ private final long groupCommitWaitPeriod;
+
+ public LogPageFlushThread(LogManager logManager) {
+ this.logManager = logManager;
+ setName("Flusher");
++ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
++ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
++ this.flushRequests = new Object[numLogPages];
++ for (int i = 0; i < numLogPages; i++) {
++ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
++ flushRequests[i] = new Object();
++ }
++ this.lastFlushedPageIndex = -1;
++ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+ }
+
++ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
++ synchronized (logManager.getLogPage(pageIndex)) {
++ //return if flushedLSN >= lsn
++ if (logManager.getLastFlushedLsn().get() >= lsn) {
++ return;
++ }
++
++ //put a new request to the queue only if the request on the page is not in the queue.
++ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
++
++ //return if the request is asynchronous
++ if (!isSynchronous) {
++ return;
++ }
++
++ //wait until there is flush.
++ boolean isNotified = false;
++ while (!isNotified) {
++ try {
++ logManager.getLogPage(pageIndex).wait();
++ isNotified = true;
++ } catch (InterruptedException e) {
++ e.printStackTrace();
++ }
++ }
++ }
++ }
++
+ @Override
+ public void run() {
+ while (true) {
+ try {
+- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
++ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
+
+- /*
+- * A wait call on the linkedBLockingQueue. The flusher thread is
+- * notified when an object is added to the queue. Please note
+- * that each page has an associated blocking queue.
+- */
+- logManager.getPendingFlushRequests(pageToFlush).take();
++ // A wait call on the linkedBLockingQueue. The flusher thread is
++ // notified when an object is added to the queue. Please note
++ // that each page has an associated blocking queue.
++ flushRequestQueue[pageToFlush].take();
+
+- /*
+- * The LogFlusher was waiting for a page to be marked as a
+- * candidate for flushing. Now that has happened. The thread
+- * shall proceed to take a lock on the log page
+- */
+- synchronized (logManager.getLogPages()[pageToFlush]) {
++ synchronized (logManager.getLogPage(pageToFlush)) {
+
+- /*
+- * lock the internal state of the log manager and create a
+- * log file if necessary.
+- */
++ // lock the internal state of the log manager and create a
++ // log file if necessary.
+ int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
+ int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
+ + logManager.getLogManagerProperties().getLogPageSize());
+@@ -936,198 +847,60 @@
+ logManager.getLogManagerProperties().getLogPageSize());
+ }
+
+- logManager.getLogPage(pageToFlush).flush(); // put the
+- // content to
+- // disk, the
+- // thread still
+- // has a lock on
+- // the log page
++ //#. sleep during the groupCommitWaitTime
++ sleep(groupCommitWaitPeriod);
+
+- /*
+- * acquire lock on the log manager as we need to update the
+- * internal bookkeeping data.
+- */
++ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
++ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+
+- // increment the last flushed lsn.
+- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
+- .getLogPageSize());
++ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
++ // meaning every one has finished writing logs on this page.
++ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
++ sleep(0);
++ }
+
+- /*
+- * the log manager gains back ownership of the page. this is
+- * reflected by incrementing the owner count of the page.
+- * recall that when the page is begin flushed the owner
+- * count is actually 0 Value of zero implicitly indicates
+- * that the page is operated upon by the log flusher thread.
+- */
+- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
++ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
++ // meaning it is flushing.
++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
+
+- /*
+- * get the number of log buffers that have been written so
+- * far. A log buffer = number of log pages * size of a log
+- * page
+- */
+- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
+- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
+- numCycles--;
+- }
++ // put the content to disk (the thread still has a lock on the log page)
++ logManager.getLogPage(pageToFlush).flush();
+
+- /*
+- * Map the log page to a new region in the log file.
+- */
++ // increment the last flushed lsn and lastFlushedPage
++ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
++ lastFlushedPageIndex = pageToFlush;
+
++ // decrement activeTxnCountOnIndexes
++ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
++
++ // reset the count to 1
++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
++
++ // Map the log page to a new region in the log file.
+ long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
+ + logManager.getLogManagerProperties().getLogBufferSize();
+
+- /*
+- * long nextPos = (numCycles + 1)
+- * logManager.getLogManagerProperties() .getLogBufferSize()
+- * + pageToFlush logManager.getLogManagerProperties()
+- * .getLogPageSize();
+- */
+ logManager.resetLogPage(nextWritePosition, pageToFlush);
+
+ // mark the page as ACTIVE
+ logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
+
+- // notify all waiting (transaction) threads.
+- // Transaction thread may be waiting for the page to be
+- // available or may have a commit log record on the page
+- // that got flushed.
+- logManager.getLogPages()[pageToFlush].notifyAll();
+- logManager.setLastFlushedPage(pageToFlush);
++ //#. checks the queue whether there is another flush request on the same log buffer
++ // If there is another request, then simply remove it.
++ if (flushRequestQueue[pageToFlush].peek() != null) {
++ flushRequestQueue[pageToFlush].take();
++ }
+
++ // notify all waiting (transaction) threads.
++ logManager.getLogPage(pageToFlush).notifyAll();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ throw new Error(" exception in flushing log page", ioe);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+- break; // must break from the loop as the exception indicates
+- // some thing horrendous has happened elsewhere
++ break;
+ }
+ }
+ }
+-}
+-
+-/*
+- * TODO: By default the commit policy is to commit at each request and not have
+- * a group commit. The following code needs to change to support group commit.
+- * The code for group commit has not been tested thoroughly and is under
+- * development.
+- */
+-class BasicCommitResolver implements ICommitResolver {
+-
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics) {
+- return true;
+- }
+-
+- public void init(LogManager logManager) {
+- }
+-}
+-
+-class GroupCommitResolver implements ICommitResolver {
+-
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics) {
+- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
+- if (timestamp == -1) {
+- if (maxCommitWait == 0) {
+- return true;
+- } else {
+- timestamp = System.currentTimeMillis();
+- }
+- }
+- long currenTime = System.currentTimeMillis();
+- if (currenTime - timestamp > maxCommitWait) {
+- return true;
+- }
+- return false;
+- }
+-
+- public void init(LogManager logManager) {
+- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
+- groupCommitHandler.setDaemon(true);
+- groupCommitHandler.start();
+- }
+-
+- class GroupCommitHandlerThread extends Thread {
+-
+- private LogManager logManager;
+-
+- public GroupCommitHandlerThread(LogManager logManager) {
+- this.logManager = logManager;
+- setName("Group Commit Handler");
+- }
+-
+- @Override
+- public void run() {
+- int pageIndex = -1;
+- while (true) {
+- pageIndex = logManager.getNextPageInSequence(pageIndex);
+- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
+- .getPageLevelLastCommitRequestTimestamp(pageIndex);
+- if (lastCommitRequeestTimestamp != -1
+- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
+- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
+- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
+- if (dirtyCount == 0) {
+- try {
+- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
+- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
+- } catch (InterruptedException e) {
+- e.printStackTrace();
+- break;
+- }
+- logManager.getCommitRequestStatistics().committedPage(pageIndex);
+- }
+- }
+- }
+- }
+- }
+-
+-}
+-
+-interface ICommitResolver {
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics);
+-
+- public void init(LogManager logManager);
+-}
+-
+-/**
+- * Represents a collection of all commit requests by transactions for each log
+- * page. The requests are accumulated until the commit policy triggers a flush
+- * of the corresponding log page. Upon a flush of a page, all commit requests
+- * for the page are cleared.
+- */
+-class CommitRequestStatistics {
+-
+- AtomicInteger[] pageLevelCommitRequestCount;
+- AtomicLong[] pageLevelLastCommitRequestTimestamp;
+-
+- public CommitRequestStatistics(int numPages) {
+- pageLevelCommitRequestCount = new AtomicInteger[numPages];
+- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
+- for (int i = 0; i < numPages; i++) {
+- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
+- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
+- }
+- }
+-
+- public void registerCommitRequest(int pageIndex) {
+- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
+- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
+- }
+-
+- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
+- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
+- }
+-
+- public void committedPage(int pageIndex) {
+- pageLevelCommitRequestCount[pageIndex].set(0);
+- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
+- }
+-
+-}
++}
+\ No newline at end of file
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
+@@ -152,6 +152,9 @@
+ case LogType.UPDATE:
+ logTypeDisplay = "UPDATE";
+ break;
++ case LogType.ENTITY_COMMIT:
++ logTypeDisplay = "ENTITY_COMMIT";
++ break;
+ }
+ builder.append(" LSN : ").append(logicalLogLocator.getLsn());
+ builder.append(" Log Type : ").append(logTypeDisplay);
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
+@@ -18,5 +18,6 @@
+
+ public static final byte UPDATE = 0;
+ public static final byte COMMIT = 1;
++ public static final byte ENTITY_COMMIT = 2;
+
+ }
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
+@@ -1,5 +1,5 @@
+ /*
+- * Copyright 2009-2010 by The Regents of the University of California
++ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+@@ -41,7 +41,7 @@
+ private int logPageSize = 128 * 1024; // 128 KB
+ private int numLogPages = 8; // number of log pages in the log buffer.
+
+- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
++ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
+ // commit record will wait before
+ // the housing page is marked for
+ // flushing.
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
+@@ -184,6 +184,7 @@
+ break;
+
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
+@@ -218,6 +219,7 @@
+ IIndex index = null;
+ LocalResource localResource = null;
+ ILocalResourceMetadata localResourceMetadata = null;
++ List<Long> resourceIdList = new ArrayList<Long>();
+
+ //#. get indexLifeCycleManager
+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+@@ -272,6 +274,8 @@
+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+ localResource.getResourceName(), localResource.getPartition());
+ indexLifecycleManager.register(resourceId, index);
++ indexLifecycleManager.open(resourceId);
++ resourceIdList.add(resourceId);
+ }
+
+ /***************************************************/
+@@ -300,6 +304,7 @@
+ break;
+
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ //do nothing
+ break;
+
+@@ -308,6 +313,11 @@
+ }
+ }
+
++ //close all indexes
++ for (long r : resourceIdList) {
++ indexLifecycleManager.close(r);
++ }
++
+ JobIdFactory.initJobId(maxJobId);
+ }
+
+@@ -539,6 +549,7 @@
+ break;
+
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+ if (undoLSNSet != null) {
+ loserTxnTable.remove(tempKeyTxnId);
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
+===================================================================
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
+@@ -42,6 +42,16 @@
+ List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
+ for (CompilationUnit cUnit : cUnits) {
+ File testFile = tcCtx.getTestFile(cUnit);
++
++ /*****************
++ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
++ System.out.println(testFile.getAbsolutePath());
++ continue;
++ }
++ System.out.println(testFile.getAbsolutePath());
++ *****************/
++
+
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
+ tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
-Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
===================================================================
---- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
-+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
-@@ -90,7 +90,7 @@
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
+@@ -95,9 +95,10 @@
+ File testFile = tcCtx.getTestFile(cUnit);
- private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
-
-- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
-+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
- ACIDException, AsterixException {
-
-@@ -111,67 +111,10 @@
- throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
- }
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-- return new JobSpecification[0];
-+ return new JobSpecification();
- }
--
-- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
-- dataset.getDatasetName());
-- int numSecondaryIndexes = 0;
-- for (Index index : datasetIndexes) {
-- if (index.isSecondaryIndex()) {
-- numSecondaryIndexes++;
-- }
-- }
-- JobSpecification[] specs;
-- if (numSecondaryIndexes > 0) {
-- specs = new JobSpecification[numSecondaryIndexes + 1];
-- int i = 0;
-- // First, drop secondary indexes.
-- for (Index index : datasetIndexes) {
-- if (index.isSecondaryIndex()) {
-- specs[i] = new JobSpecification();
-- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
-- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
-- datasetName, index.getIndexName());
-- IIndexDataflowHelperFactory dfhFactory;
-- switch (index.getIndexType()) {
-- case BTREE:
-- dfhFactory = new LSMBTreeDataflowHelperFactory(
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
-- break;
-- case RTREE:
-- dfhFactory = new LSMRTreeDataflowHelperFactory(
-- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
-- new IBinaryComparatorFactory[] { null },
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
-- break;
-- case NGRAM_INVIX:
-- case WORD_INVIX:
-- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
-- break;
-- default:
-- throw new AsterixException("Unknown index type provided.");
-- }
-- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
-- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
-- idxSplitsAndConstraint.second);
-- i++;
-- }
-- }
-- } else {
-- specs = new JobSpecification[1];
-- }
-+
- JobSpecification specPrimary = new JobSpecification();
-- specs[specs.length - 1] = specPrimary;
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
-@@ -187,7 +130,7 @@
-
- specPrimary.addRoot(primaryBtreeDrop);
-
-- return specs;
-+ return specPrimary;
- }
-
- public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
-Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+ /*************** to avoid run failure cases ****************
+- if (!testFile.getAbsolutePath().contains("index-selection/")) {
++ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
+ continue;
+ }
++ System.out.println(testFile.getAbsolutePath());
+ ************************************************************/
+
+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
+Index: diff_file
===================================================================
---- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
-+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
-@@ -21,6 +21,8 @@
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-+import java.util.concurrent.locks.ReadWriteLock;
-+import java.util.concurrent.locks.ReentrantReadWriteLock;
-
- import org.json.JSONException;
-
-@@ -68,6 +70,7 @@
- import edu.uci.ics.asterix.metadata.MetadataException;
- import edu.uci.ics.asterix.metadata.MetadataManager;
- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
- import edu.uci.ics.asterix.metadata.entities.Datatype;
-@@ -112,6 +115,7 @@
- private final PrintWriter out;
- private final SessionConfig sessionConfig;
- private final DisplayFormat pdf;
-+ private final ReadWriteLock cacheLatch;
- private Dataverse activeDefaultDataverse;
- private List<FunctionDecl> declaredFunctions;
-
-@@ -121,6 +125,7 @@
- this.out = out;
- this.sessionConfig = pc;
- this.pdf = pdf;
-+ this.cacheLatch = new ReentrantReadWriteLock(true);
- declaredFunctions = getDeclaredFunctions(aqlStatements);
- }
-
-@@ -143,8 +148,7 @@
-
- for (Statement stmt : aqlStatements) {
- validateOperation(activeDefaultDataverse, stmt);
-- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
-+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
- metadataProvider.setWriterFactory(writerFactory);
- metadataProvider.setOutputFile(outputFile);
- metadataProvider.setConfig(config);
-@@ -253,15 +257,9 @@
- }
-
- }
-- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
-- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e);
- }
-- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
-- for (JobSpecification jobspec : jobsToExecute) {
-- runJob(hcc, jobspec);
-- }
- }
- return executionResult;
- }
-@@ -289,398 +287,802 @@
-
- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
-- DataverseDecl dvd = (DataverseDecl) stmt;
-- String dvName = dvd.getDataverseName().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-- if (dv == null) {
-- throw new MetadataException("Unknown dataverse " + dvName);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ DataverseDecl dvd = (DataverseDecl) stmt;
-+ String dvName = dvd.getDataverseName().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-+ if (dv == null) {
-+ throw new MetadataException("Unknown dataverse " + dvName);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return dv;
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new MetadataException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- return dv;
- }
-
- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-- String dvName = stmtCreateDataverse.getDataverseName().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-+ }
-+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-- stmtCreateDataverse.getFormat()));
- }
-
- private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
-- DatasetDecl dd = (DatasetDecl) stmt;
-- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
-- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = dd.getName().getValue();
-- DatasetType dsType = dd.getDatasetType();
-- String itemTypeName = dd.getItemTypeName().getValue();
-
-- IDatasetDetails datasetDetails = null;
-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName);
-- if (ds != null) {
-- if (dd.getIfNotExists()) {
-- return;
-- } else {
-- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ DatasetDecl dd = (DatasetDecl) stmt;
-+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
-+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
- }
-- }
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-- itemTypeName);
-- if (dt == null) {
-- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
-- }
-- switch (dd.getDatasetType()) {
-- case INTERNAL: {
-- IAType itemType = dt.getDatatype();
-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
-- throw new AlgebricksException("Can only partition ARecord's.");
-+ String datasetName = dd.getName().getValue();
-+ DatasetType dsType = dd.getDatasetType();
-+ String itemTypeName = dd.getItemTypeName().getValue();
-+
-+ IDatasetDetails datasetDetails = null;
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName);
-+ if (ds != null) {
-+ if (dd.getIfNotExists()) {
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
-+ } else {
-+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
- }
-- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-- .getPartitioningExprs();
-- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
-- break;
- }
-- case EXTERNAL: {
-- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-- datasetDetails = new ExternalDatasetDetails(adapter, properties);
-- break;
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ itemTypeName);
-+ if (dt == null) {
-+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
- }
-- case FEED: {
-- IAType itemType = dt.getDatatype();
-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
-- throw new AlgebricksException("Can only partition ARecord's.");
-+ switch (dd.getDatasetType()) {
-+ case INTERNAL: {
-+ IAType itemType = dt.getDatatype();
-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
-+ throw new AlgebricksException("Can only partition ARecord's.");
-+ }
-+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getPartitioningExprs();
-+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-+ ngName);
-+ break;
- }
-- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
-- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
-- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
-- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
-- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
-- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
-- break;
-+ case EXTERNAL: {
-+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
-+ break;
-+ }
-+ case FEED: {
-+ IAType itemType = dt.getDatatype();
-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
-+ throw new AlgebricksException("Can only partition ARecord's.");
-+ }
-+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getPartitioningExprs();
-+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
-+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getConfiguration();
-+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
-+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
-+ break;
-+ }
- }
-+
-+ //#. add a new dataset with PendingAddOp
-+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
-+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-+
-+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
-+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-+ dataverseName);
-+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
-+ metadataProvider);
-+
-+ //#. make metadataTxn commit before calling runJob.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ //#. runJob
-+ runJob(hcc, jobSpec);
-+
-+ //#. begin new metadataTxn
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ }
-+
-+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
-+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
-+ IMetadataEntity.PENDING_NO_OP));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
-- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
-- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-- dataverseName);
-- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
-- }
- }
-
- private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = stmtCreateIndex.getDatasetName().getValue();
-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName);
-- if (ds == null) {
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName);
-- }
-- String indexName = stmtCreateIndex.getIndexName().getValue();
-- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName, indexName);
-- if (idx != null) {
-- if (!stmtCreateIndex.getIfNotExists()) {
-- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-- } else {
-- stmtCreateIndex.setNeedToCreate(false);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
- }
-- } else {
-+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName);
-+ if (ds == null) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-+ + dataverseName);
-+ }
-+
-+ String indexName = stmtCreateIndex.getIndexName().getValue();
-+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName, indexName);
-+
-+ if (idx != null) {
-+ if (!stmtCreateIndex.getIfNotExists()) {
-+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-+ } else {
-+ stmtCreateIndex.setNeedToCreate(false);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
-+ }
-+ }
-+
-+ //#. add a new index with PendingAddOp
- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
-+ IMetadataEntity.PENDING_ADD_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
-
-+ //#. create the index artifact in NC.
- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-- JobSpecification loadIndexJobSpec = IndexOperations
-- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-- runJob(hcc, loadIndexJobSpec);
-+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
-+ if (spec == null) {
-+ throw new AsterixException("Failed to create job spec for creating index '"
-+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, spec);
-+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. load data into the index in NC.
-+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
-+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, spec);
-+
-+ //#. begin new metadataTxn
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
-+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-+ indexName);
-+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
-+ IMetadataEntity.PENDING_NO_OP);
-+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
- MetadataException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- TypeDecl stmtCreateType = (TypeDecl) stmt;
-- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String typeName = stmtCreateType.getIdent().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-- if (dv == null) {
-- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
-- }
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-- if (dt != null) {
-- if (!stmtCreateType.getIfNotExists())
-- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-- } else {
-- if (builtinTypeMap.get(typeName) != null) {
-- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ TypeDecl stmtCreateType = (TypeDecl) stmt;
-+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String typeName = stmtCreateType.getIdent().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-+ if (dv == null) {
-+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
-+ }
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-+ if (dt != null) {
-+ if (!stmtCreateType.getIfNotExists()) {
-+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-+ }
- } else {
-- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
-- dataverseName);
-- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-- IAType type = typeMap.get(typeSignature);
-- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-+ if (builtinTypeMap.get(typeName) != null) {
-+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-+ } else {
-+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
-+ dataverseName);
-+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-+ IAType type = typeMap.get(typeSignature);
-+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-+ }
- }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-- String dvName = stmtDelete.getDataverseName().getValue();
-
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
-- if (dv == null) {
-- if (!stmtDelete.getIfExists()) {
-- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-+ String dvName = stmtDelete.getDataverseName().getValue();
-+
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
-+ if (dv == null) {
-+ if (!stmtDelete.getIfExists()) {
-+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
- }
-- } else {
-+
-+ //#. prepare jobs which will drop corresponding datasets with indexes.
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
- for (int j = 0; j < datasets.size(); j++) {
- String datasetName = datasets.get(j).getDatasetName();
- DatasetType dsType = datasets.get(j).getDatasetType();
- if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
-+
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
- for (int k = 0; k < indexes.size(); k++) {
- if (indexes.get(k).isSecondaryIndex()) {
-- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
-- metadataProvider);
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
-+ indexes.get(k).getIndexName());
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
- }
- }
-+
-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
- }
-- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
- }
-
-+ //#. mark PendingDropOp on the dataverse record by
-+ // first, deleting the dataverse record from the DATAVERSE_DATASET
-+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
-+ IMetadataEntity.PENDING_DROP_OP));
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
-+ }
-+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. finally, delete the dataverse.
-+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
- activeDefaultDataverse = null;
- }
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- DropStatement stmtDelete = (DropStatement) stmt;
-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = stmtDelete.getDatasetName().getValue();
-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-- if (ds == null) {
-- if (!stmtDelete.getIfExists())
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName + ".");
-- } else {
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ DropStatement stmtDelete = (DropStatement) stmt;
-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String datasetName = stmtDelete.getDatasetName().getValue();
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-+ if (ds == null) {
-+ if (!stmtDelete.getIfExists()) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName
-+ + " in dataverse " + dataverseName + ".");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
-+ }
-+
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-+
-+ //#. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
-- if (indexes.get(j).isPrimaryIndex()) {
-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
-- metadataProvider);
-+ if (indexes.get(j).isSecondaryIndex()) {
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-+ indexes.get(j).getIndexName());
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
- }
- }
-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-+
-+ //#. mark the existing dataset as PendingDropOp
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(
-+ mdTxnCtx,
-+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
-+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ //#. run the jobs
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
-+ }
-+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
-+
-+ //#. finally, delete the dataset.
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-- String datasetName = stmtIndexDrop.getDatasetName().getValue();
-- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
-+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-+ if (ds == null) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-+ + dataverseName);
-+ }
-+
-+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-+ String indexName = stmtIndexDrop.getIndexName().getValue();
-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ if (index == null) {
-+ if (!stmtIndexDrop.getIfExists()) {
-+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
-+ }
-+ } else {
-+ //#. prepare a job to drop the index in NC.
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-+ indexName);
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-+
-+ //#. mark PendingDropOp on the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ MetadataManager.INSTANCE.addIndex(
-+ mdTxnCtx,
-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
-+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ //#. commit the existing transaction before calling runJob.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
-+ }
-+
-+ //#. begin a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. finally, delete the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ }
-+ } else {
-+ throw new AlgebricksException(datasetName
-+ + " is an external dataset. Indexes are not maintained for external datasets.");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+
-+ } finally {
-+ releaseWriteLatch();
- }
-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-- if (ds == null)
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName);
-- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-- String indexName = stmtIndexDrop.getIndexName().getValue();
-- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-- if (idx == null) {
-- if (!stmtIndexDrop.getIfExists())
-- throw new AlgebricksException("There is no index with this name " + indexName + ".");
-- } else
-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
-- } else {
-- throw new AlgebricksException(datasetName
-- + " is an external dataset. Indexes are not maintained for external datasets.");
-- }
- }
-
- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String typeName = stmtTypeDrop.getTypeName().getValue();
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-+ if (dt == null) {
-+ if (!stmtTypeDrop.getIfExists())
-+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
-+ } else {
-+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- String typeName = stmtTypeDrop.getTypeName().getValue();
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-- if (dt == null) {
-- if (!stmtTypeDrop.getIfExists())
-- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
-- } else {
-- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
-- }
- }
-
- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
-- if (ng == null) {
-- if (!stmtDelete.getIfExists())
-- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
-- } else {
-- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
-+ if (ng == null) {
-+ if (!stmtDelete.getIfExists())
-+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
-+ } else {
-+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
-+ }
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
-- if (dataverse == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
-+ if (dataverse == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-+ if (dv == null) {
-+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
-+ }
-+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
-+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-- if (dv == null) {
-- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
-- }
-- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
-- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
- }
-
- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
- AlgebricksException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
-- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-- if (function == null) {
-- if (!stmtDropFunction.getIfExists())
-- throw new AlgebricksException("Unknonw function " + signature);
-- } else {
-- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
-+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-+ if (function == null) {
-+ if (!stmtDropFunction.getIfExists())
-+ throw new AlgebricksException("Unknonw function " + signature);
-+ } else {
-+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
-- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
-- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
-- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
-
-- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
-- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-- jobsToExecute.add(job.getJobSpec());
-- // Also load the dataset's secondary indexes.
-- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
-- .getDatasetName().getValue());
-- for (Index index : datasetIndexes) {
-- if (!index.isSecondaryIndex()) {
-- continue;
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
-+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
-+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
-+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
-+ loadStmt.dataIsAlreadySorted());
-+
-+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
-+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-+ jobsToExecute.add(job.getJobSpec());
-+ // Also load the dataset's secondary indexes.
-+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
-+ .getDatasetName().getValue());
-+ for (Index index : datasetIndexes) {
-+ if (!index.isSecondaryIndex()) {
-+ continue;
-+ }
-+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
-+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
-+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
-+ index.getIndexType());
-+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
- }
-- // Create CompiledCreateIndexStatement from metadata entity 'index'.
-- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobspec : jobsToExecute) {
-+ runJob(hcc, jobspec);
-+ }
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
-
- private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
-- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
-- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
-- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
-
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
-+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
-+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
-+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
-+
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
-
- private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- InsertStatement stmtInsert = (InsertStatement) stmt;
-- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
-- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ InsertStatement stmtInsert = (InsertStatement) stmt;
-+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
-+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
-
- private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- DeleteStatement stmtDelete = (DeleteStatement) stmt;
-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
-- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
-- stmtDelete.getVarCounter(), metadataProvider);
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
-+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
-+ stmtDelete.getVarCounter(), metadataProvider);
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
-
-@@ -704,46 +1106,109 @@
-
- private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
-- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
-
-- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
-- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-
-- Dataset dataset;
-- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-- .getDatasetName().getValue());
-- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
-- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
-- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
-+ try {
-+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
-+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
-+
-+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
-+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
-+
-+ Dataset dataset;
-+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-+ .getDatasetName().getValue());
-+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
-+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
-+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
-+ + " is not a feed dataset");
-+ }
-+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
-+ cbfs.setQuery(bfs.getQuery());
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
-- cbfs.setQuery(bfs.getQuery());
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-- }
- }
-
- private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
-- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
-- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
-- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
-+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
-+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
-+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, jobSpec);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
-+ }
- }
-
- private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
- List<JobSpecification> jobsToExecute) throws Exception {
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-- if (compiled.first != null) {
-- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-+
-+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ return queryResult;
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
- }
-
- private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
-@@ -768,20 +1233,32 @@
- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
-- if (ng != null) {
-- if (!stmtCreateNodegroup.getIfNotExists())
-- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
-- } else {
-- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
-- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
-- for (Identifier id : ncIdentifiers) {
-- ncNames.add(id.getValue());
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
-+ if (ng != null) {
-+ if (!stmtCreateNodegroup.getIfNotExists())
-+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
-+ } else {
-+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
-+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
-+ for (Identifier id : ncIdentifiers) {
-+ ncNames.add(id.getValue());
-+ }
-+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
- }
-- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
-@@ -791,10 +1268,37 @@
-
- private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
- String indexName, AqlMetadataProvider metadataProvider) throws Exception {
-+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+
-+ //#. mark PendingDropOp on the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ MetadataManager.INSTANCE.addIndex(
-+ mdTxnCtx,
-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
-+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-- indexName);
-+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
-+
-+ //#. commit the existing transaction before calling runJob.
-+ // the caller should begin the transaction before calling this function.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ try {
-+ runJob(hcc, jobSpec);
-+ } catch (Exception e) {
-+ //need to create the mdTxnCtx to be aborted by caller properly
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ throw e;
-+ }
-+
-+ //#. begin a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. finally, delete the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- }
-
- private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
-@@ -803,10 +1307,32 @@
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-- for (JobSpecification spec : jobSpecs)
-- runJob(hcc, spec);
-+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-+
-+ //#. mark PendingDropOp on the existing dataset
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
-+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ //#. commit the transaction
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ //#. run the job
-+ try {
-+ runJob(hcc, jobSpec);
-+ } catch (Exception e) {
-+ //need to create the mdTxnCtx to be aborted by caller properly
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ throw e;
-+ }
-+
-+ //#. start a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-+
-+ //#. finally, delete the existing dataset.
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- }
-
-@@ -831,4 +1357,20 @@
- }
- return format;
- }
-+
-+ private void acquireWriteLatch() {
-+ cacheLatch.writeLock().lock();
-+ }
-+
-+ private void releaseWriteLatch() {
-+ cacheLatch.writeLock().unlock();
-+ }
-+
-+ private void acquireReadLatch() {
-+ cacheLatch.readLock().lock();
-+ }
-+
-+ private void releaseReadLatch() {
-+ cacheLatch.readLock().unlock();
-+ }
- }
+--- diff_file (revision 1194)
++++ diff_file (working copy)
+@@ -1,2098 +1,1252 @@
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+ ===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
+-@@ -25,8 +25,11 @@
+- import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+- import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+- import edu.uci.ics.asterix.metadata.entities.Dataverse;
+-+import edu.uci.ics.asterix.om.base.AInt32;
+-+import edu.uci.ics.asterix.om.base.AMutableInt32;
+- import edu.uci.ics.asterix.om.base.ARecord;
+- import edu.uci.ics.asterix.om.base.AString;
+-+import edu.uci.ics.asterix.om.types.BuiltinType;
+- import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+- import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
++@@ -103,12 +103,14 @@
++ //for entity-level commit
++ if (PKHashVal != -1) {
++ transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+++ /*****************************
++ try {
++ //decrease the transaction reference count on index
++ txnContext.decreaseActiveTransactionCountOnIndexes();
++ } catch (HyracksDataException e) {
++ throw new ACIDException("failed to complete index operation", e);
++ }
+++ *****************************/
++ return;
++ }
+
+-@@ -40,12 +43,18 @@
+- // Payload field containing serialized Dataverse.
+- public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
+-
+-+ private AMutableInt32 aInt32;
+-+ protected ISerializerDeserializer<AInt32> aInt32Serde;
+-+
+- @SuppressWarnings("unchecked")
+- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+- .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
+-
+-+ @SuppressWarnings("unchecked")
+- public DataverseTupleTranslator(boolean getTuple) {
+- super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
+-+ aInt32 = new AMutableInt32(-1);
+-+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+- }
+-
+- @Override
+-@@ -57,7 +66,8 @@
+- DataInput in = new DataInputStream(stream);
+- ARecord dataverseRecord = recordSerDes.deserialize(in);
+- return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
+-- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
+-+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
+-+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
+- }
+-
+- @Override
+-@@ -88,6 +98,12 @@
+- stringSerde.serialize(aString, fieldValue.getDataOutput());
+- recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+-
+-+ // write field 3
+-+ fieldValue.reset();
+-+ aInt32.setValue(instance.getPendingOp());
+-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+-+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+-+
+- recordBuilder.write(tupleBuilder.getDataOutput(), true);
+- tupleBuilder.addFieldEndOffset();
+-
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+ ===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
+-@@ -77,9 +77,9 @@
+- protected ISerializerDeserializer<AInt32> aInt32Serde;
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
++@@ -19,6 +19,7 @@
++ import java.util.HashSet;
++ import java.util.List;
++ import java.util.Set;
+++import java.util.concurrent.atomic.AtomicInteger;
+
+- @SuppressWarnings("unchecked")
+-- public DatasetTupleTranslator(boolean getTuple) {
+-+ public DatasetTupleTranslator(boolean getTuple) {
+- super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+-- aInt32 = new AMutableInt32(-1);
+-+ aInt32 = new AMutableInt32(-1);
+- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+- }
+-
+-@@ -104,8 +104,10 @@
+- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
+- DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
+- IDatasetDetails datasetDetails = null;
+-- int datasetId = ((AInt32) datasetRecord
+-+ int datasetId = ((AInt32) datasetRecord
+- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
+-+ int pendingOp = ((AInt32) datasetRecord
+-+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
+- switch (datasetType) {
+- case FEED:
+- case INTERNAL: {
+-@@ -197,7 +199,7 @@
+- }
+- datasetDetails = new ExternalDatasetDetails(adapter, properties);
++ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
++ import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
++@@ -169,5 +170,14 @@
++ closeable.close(this);
+ }
+-- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
+-+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
+ }
+++
+++ @Override
+++ public int hashCode() {
+++ return jobId.getId();
+++ }
+
+- @Override
+-@@ -248,13 +250,19 @@
+- aString.setValue(Calendar.getInstance().getTime().toString());
+- stringSerde.serialize(aString, fieldValue.getDataOutput());
+- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+--
+-+
+- // write field 8
+- fieldValue.reset();
+- aInt32.setValue(dataset.getDatasetId());
+- aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
+--
+-+
+-+ // write field 9
+-+ fieldValue.reset();
+-+ aInt32.setValue(dataset.getPendingOp());
+-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+-+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+-+
+- // write record
+- recordBuilder.write(tupleBuilder.getDataOutput(), true);
+- tupleBuilder.addFieldEndOffset();
+-@@ -290,13 +298,15 @@
+- fieldValue.reset();
+- aString.setValue(name);
+- stringSerde.serialize(aString, fieldValue.getDataOutput());
+-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
+-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
+-+ fieldValue);
+-
+- // write field 1
+- fieldValue.reset();
+- aString.setValue(value);
+- stringSerde.serialize(aString, fieldValue.getDataOutput());
+-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
+-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
+-+ fieldValue);
+-
+- propertyRecordBuilder.write(out, true);
+- }
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ @Override
+++ public boolean equals(Object o) {
+++ return (o == this);
+++ }
++ }
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+ ===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
+-@@ -96,13 +96,15 @@
+- }
+- Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
+- .getBoolean();
+-+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
+-+ .getIntegerValue();
+- // Check if there is a gram length as well.
+- int gramLength = -1;
+- int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
+- if (gramLenPos >= 0) {
+- gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
+- }
+-- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
+-+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
+- }
+-
+- @Override
+-@@ -174,7 +176,12 @@
+- stringSerde.serialize(aString, fieldValue.getDataOutput());
+- recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+-
+-- // write optional field 7
+-+ // write field 7
+-+ fieldValue.reset();
+-+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
+-+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+-+
+-+ // write optional field 8
+- if (instance.getGramLength() > 0) {
+- fieldValue.reset();
+- nameValue.reset();
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
++@@ -567,7 +567,7 @@
++ if (commitFlag) {
++ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
++ try {
++- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
+++ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
++ entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
++ } catch (ACIDException e) {
++ try {
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+ ===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
+-@@ -129,7 +129,7 @@
+-
+- public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+- private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
+-- private final MetadataTransactionContext mdTxnCtx;
+-+ private MetadataTransactionContext mdTxnCtx;
+- private boolean isWriteTransaction;
+- private Map<String, String[]> stores;
+- private Map<String, String> config;
+-@@ -156,8 +156,7 @@
+- return config;
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
++@@ -75,6 +75,7 @@
++ buffer.position(0);
++ buffer.limit(size);
++ fileChannel.write(buffer);
+++ fileChannel.force(false);
++ erase();
+ }
+
+-- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
+-- this.mdTxnCtx = mdTxnCtx;
+-+ public AqlMetadataProvider(Dataverse defaultDataverse) {
+- this.defaultDataverse = defaultDataverse;
+- this.stores = AsterixProperties.INSTANCE.getStores();
+- }
+-@@ -181,6 +180,10 @@
+- public void setWriterFactory(IAWriterFactory writerFactory) {
+- this.writerFactory = writerFactory;
+- }
+-+
+-+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
+-+ this.mdTxnCtx = mdTxnCtx;
+-+ }
+-
+- public MetadataTransactionContext getMetadataTxnContext() {
+- return mdTxnCtx;
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+ ===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
+-@@ -35,15 +35,18 @@
+- private final DatasetType datasetType;
+- private IDatasetDetails datasetDetails;
+- private final int datasetId;
+-+ // Type of pending operations with respect to atomic DDL operation
+-+ private final int pendingOp;
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
++@@ -1,5 +1,5 @@
++ /*
++- * Copyright 2009-2010 by The Regents of the University of California
+++ * Copyright 2009-2012 by The Regents of the University of California
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * you may obtain a copy of the License from
++@@ -21,7 +21,12 @@
++ import java.io.RandomAccessFile;
++ import java.nio.ByteBuffer;
++ import java.nio.channels.FileChannel;
+++import java.util.ArrayList;
+++import java.util.HashMap;
+++import java.util.List;
+++import java.util.Map;
++ import java.util.Properties;
+++import java.util.Set;
++ import java.util.concurrent.LinkedBlockingQueue;
++ import java.util.concurrent.atomic.AtomicInteger;
++ import java.util.concurrent.atomic.AtomicLong;
++@@ -30,22 +35,25 @@
+
+- public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
+-- DatasetType datasetType, int datasetId) {
+-+ DatasetType datasetType, int datasetId, int pendingOp) {
+- this.dataverseName = dataverseName;
+- this.datasetName = datasetName;
+- this.itemTypeName = itemTypeName;
+- this.datasetType = datasetType;
+- this.datasetDetails = datasetDetails;
+- this.datasetId = datasetId;
+-+ this.pendingOp = pendingOp;
+- }
++ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
++ import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
+++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
+++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
++ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
++ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
++ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+++import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+- public String getDataverseName() {
+-@@ -73,6 +76,10 @@
+- public int getDatasetId() {
+- return datasetId;
+- }
+-+
+-+ public int getPendingOp() {
+-+ return pendingOp;
+-+ }
++ public class LogManager implements ILogManager {
+
+- @Override
+- public Object addToCache(MetadataCache cache) {
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
+-@@ -45,9 +45,11 @@
+- private final boolean isPrimaryIndex;
+- // Specific to NGRAM indexes.
+- private final int gramLength;
+-+ // Type of pending operations with respect to atomic DDL operation
+-+ private final int pendingOp;
++ public static final boolean IS_DEBUG_MODE = false;//true
++ private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
++- private TransactionSubsystem provider;
+++ private final TransactionSubsystem provider;
++ private LogManagerProperties logManagerProperties;
+++ private LogPageFlushThread logPageFlusher;
+
+- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+-- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
+-+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
+- this.dataverseName = dataverseName;
+- this.datasetName = datasetName;
+- this.indexName = indexName;
+-@@ -55,10 +57,11 @@
+- this.keyFieldNames = keyFieldNames;
+- this.gramLength = gramLength;
+- this.isPrimaryIndex = isPrimaryIndex;
+-+ this.pendingOp = pendingOp;
+- }
++ /*
++ * the array of log pages. The number of log pages is configurable. Pages
++ * taken together form an in-memory log buffer.
++ */
++-
++ private IFileBasedBuffer[] logPages;
+
+- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
+-- List<String> keyFieldNames, boolean isPrimaryIndex) {
+-+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
+- this.dataverseName = dataverseName;
+- this.datasetName = datasetName;
+- this.indexName = indexName;
+-@@ -66,6 +69,7 @@
+- this.keyFieldNames = keyFieldNames;
+- this.gramLength = -1;
+- this.isPrimaryIndex = isPrimaryIndex;
+-+ this.pendingOp = pendingOp;
+- }
++ private ILogRecordHelper logRecordHelper;
++@@ -54,6 +62,7 @@
++ * Number of log pages that constitute the in-memory log buffer.
++ */
++ private int numLogPages;
+++
++ /*
++ * Initially all pages have an owner count of 1 that is the LogManager. When
++ * a transaction requests to write in a log page, the owner count is
++@@ -62,12 +71,11 @@
++ * (covering the whole log record). When the content has been put, the log
++ * manager computes the checksum and puts it after the content. At this
++ * point, the ownership count is decremented as the transaction is done with
++- * using the page. When a page is full, the log manager decrements the count
++- * by one indicating that it has released its ownership of the log page.
++- * There could be other transaction(s) still owning the page (that is they
++- * could still be mid-way putting the log content). When the ownership count
++- * eventually reaches zero, the thread responsible for flushing the log page
++- * is notified and the page is flushed to disk.
+++ * using the page. When a page is requested to be flushed, logPageFlusher
+++ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
+++ * only if the count is 1(LOG_WRITER: meaning that there is no other
+++ * transactions who own the page to write logs.) After flushing the page,
+++ * logPageFlusher set this count to 1.
++ */
++ private AtomicInteger[] logPageOwnerCount;
+
+- public String getDataverseName() {
+-@@ -95,6 +99,10 @@
+- public boolean isPrimaryIndex() {
+- return isPrimaryIndex;
+- }
+-+
+-+ public int getPendingOp() {
+-+ return pendingOp;
+-+ }
++@@ -78,18 +86,16 @@
+
+- public boolean isSecondaryIndex() {
+- return !isPrimaryIndex();
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
+-@@ -27,10 +27,12 @@
+- // Enforced to be unique within an Asterix cluster..
+- private final String dataverseName;
+- private final String dataFormat;
+-+ private final int pendingOp;
++ /*
++ * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
++- * page is maintained in a map called logPageStatus. A page is ACTIVE when
++- * the LogManager can allocate space in the page for writing a log record.
++- * Initially all pages are ACTIVE. As transactions fill up space by writing
++- * log records, a page may not have sufficient space left for serving a
++- * request by a transaction. When this happens, the page is marked INACTIVE.
++- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
++- * 0) indicates that the page must be flushed to disk before any other log
++- * record is written on the page.F
+++ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
+++ * can allocate space in the page for writing a log record. Initially all
+++ * pages are ACTIVE. As transactions fill up space by writing log records,
+++ * a page may not have sufficient space left for serving a request by a
+++ * transaction. When this happens, the page is flushed to disk by calling
+++ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
+++ * the page status is set to INACTIVE. Then, there is no more writer on the
+++ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
+++ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
++ */
++-
++- // private Map<Integer, Integer> logPageStatus = new
++- // ConcurrentHashMap<Integer, Integer>();
++ private AtomicInteger[] logPageStatus;
+
+-- public Dataverse(String dataverseName, String format) {
+-+ public Dataverse(String dataverseName, String format, int pendingOp) {
+- this.dataverseName = dataverseName;
+- this.dataFormat = format;
+-+ this.pendingOp = pendingOp;
++ static class PageState {
++@@ -98,41 +104,8 @@
+ }
+
+- public String getDataverseName() {
+-@@ -40,6 +42,10 @@
+- public String getDataFormat() {
+- return dataFormat;
+- }
+-+
+-+ public int getPendingOp() {
+-+ return pendingOp;
+-+ }
++ private AtomicLong lastFlushedLsn = new AtomicLong(-1);
++- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
+
+- @Override
+- public Object addToCache(MetadataCache cache) {
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
+-@@ -25,6 +25,7 @@
+- import edu.uci.ics.asterix.common.exceptions.AsterixException;
+- import edu.uci.ics.asterix.common.functions.FunctionSignature;
+- import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+- import edu.uci.ics.asterix.metadata.api.IMetadataNode;
+- import edu.uci.ics.asterix.metadata.api.IValueExtractor;
+-@@ -160,7 +161,7 @@
+- // Add the primary index for the dataset.
+- InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
+- Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
+-- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
+-+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
+- addIndex(jobId, primaryIndex);
+- ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
+- dataset.getDatasetName());
+-@@ -260,7 +261,7 @@
+- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
+- NoOpOperationCallback.INSTANCE);
+- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+- // TODO: fix exceptions once new BTree exception model is in hyracks.
+- indexAccessor.insert(tuple);
+- //TODO: extract the key from the tuple and get the PKHashValue from the key.
+-@@ -536,7 +537,7 @@
+- // The transaction with txnId will have an S lock on the
+- // resource. Note that lock converters have a higher priority than
+- // regular waiters in the LockManager.
+-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+- indexAccessor.delete(tuple);
+- //TODO: extract the key from the tuple and get the PKHashValue from the key.
+- //check how to get the oldValue.
+-@@ -803,7 +804,9 @@
+- private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
+- IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
+- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
+-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+-+ //#. currently lock is not needed to access any metadata
+-+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
+-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+- IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
+- long resourceID = index.getResourceID();
+- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
+-@@ -20,6 +20,11 @@
+- import edu.uci.ics.asterix.metadata.MetadataCache;
+-
+- public interface IMetadataEntity extends Serializable {
+-+
+-+ public static final int PENDING_NO_OP = 0;
+-+ public static final int PENDING_ADD_OP = 1;
+-+ public static final int PENDING_DROP_OP = 2;
+-+
+- Object addToCache(MetadataCache cache);
+-
+- Object dropFromCache(MetadataCache cache);
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
+-@@ -17,6 +17,8 @@
+-
+- import java.rmi.RemoteException;
+- import java.util.List;
+-+import java.util.concurrent.locks.ReadWriteLock;
+-+import java.util.concurrent.locks.ReentrantReadWriteLock;
+-
+- import edu.uci.ics.asterix.common.functions.FunctionSignature;
+- import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+-@@ -79,11 +81,10 @@
+- public class MetadataManager implements IMetadataManager {
+- // Set in init().
+- public static MetadataManager INSTANCE;
++ /*
++- * pendingFlushRequests is a map with key as Integer denoting the page
++- * index. When a (transaction) thread discovers the need to flush a page, it
++- * puts its Thread object into the corresponding value that is a
++- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
++- * this map in order of page index (and circling around). The flusher thread
++- * needs to flush pages in order and waits for a thread to deposit an object
++- * in the blocking queue corresponding to the next page in order. A request
++- * to flush a page is conveyed to the flush thread by simply depositing an
++- * object in to corresponding blocking queue. It is blocking in the sense
++- * that the flusher thread will continue to wait for an object to arrive in
++- * the queue. The object itself is ignored by the fliusher and just acts as
++- * a signal/event that a page needs to be flushed.
++- */
+ -
+- private final MetadataCache cache = new MetadataCache();
+- private IAsterixStateProxy proxy;
+- private IMetadataNode metadataNode;
++- private LinkedBlockingQueue[] pendingFlushRequests;
+ -
+-+
+- public MetadataManager(IAsterixStateProxy proxy) {
+- if (proxy == null) {
+- throw new Error("Null proxy given to MetadataManager.");
+-@@ -206,11 +207,14 @@
++- /*
++- * ICommitResolver is an interface that provides an API that can answer a
++- * simple boolean - Given the commit requests so far, should a page be
++- * flushed. The implementation of the interface contains the logic (or you
++- * can say the policy) for commit. It could be group commit in which case
++- * the commit resolver may not return a true indicating that it wishes to
++- * delay flushing of the page.
++- */
++- private ICommitResolver commitResolver;
++-
++- /*
++- * An object that keeps track of the submitted commit requests.
++- */
++- private CommitRequestStatistics commitRequestStatistics;
++-
++- /*
++ * When the transaction eco-system comes to life, the log manager positions
++ * itself to the end of the last written log. the startingLsn represent the
++ * lsn value of the next log record to be written after a system (re)start.
++@@ -146,16 +119,10 @@
++ */
++ private AtomicLong lsn = new AtomicLong(0);
+
+- @Override
+- public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
+-+ // add dataset into metadataNode
+- try {
+- metadataNode.addDataset(ctx.getJobId(), dataset);
+- } catch (RemoteException e) {
+- throw new MetadataException(e);
+- }
+-+
+-+ // reflect the dataset into the cache
+- ctx.addDataset(dataset);
++- /*
++- * A map that tracks the flush requests submitted for each page. The
++- * requests for a page are cleared when the page is flushed.
++- */
++- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
++- return pendingFlushRequests[pageIndex];
++- }
+++ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
++
++- public void addFlushRequest(int pageIndex) {
++- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
+++ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
+++ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
+ }
+
+-@@ -585,4 +589,5 @@
++ public AtomicLong getLastFlushedLsn() {
++@@ -233,19 +200,12 @@
++ numLogPages = logManagerProperties.getNumLogPages();
++ logPageOwnerCount = new AtomicInteger[numLogPages];
++ logPageStatus = new AtomicInteger[numLogPages];
++- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
++- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
++- // the
++- // Commit
++- // Resolver
++- commitResolver = new GroupCommitResolver(); // Group Commit is
++- // enabled
++- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
++- } else {
++- commitResolver = new BasicCommitResolver(); // the basic commit
++- // resolver
+++
+++ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
+++ for (int i = 0; i < numLogPages; i++) {
+++ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
+ }
+- return adapter;
+- }
++- this.commitResolver.init(this); // initialize the commit resolver
+ +
+- }
+-\ No newline at end of file
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
+-@@ -19,6 +19,7 @@
++ logPages = new FileBasedBuffer[numLogPages];
+
+- import edu.uci.ics.asterix.common.functions.FunctionSignature;
+- import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
+-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+- import edu.uci.ics.asterix.metadata.entities.Dataset;
+- import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+- import edu.uci.ics.asterix.metadata.entities.Datatype;
+-@@ -104,19 +105,19 @@
+- }
++ /*
++@@ -264,7 +224,6 @@
++ for (int i = 0; i < numLogPages; i++) {
++ logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
++ logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
++- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
++ }
+
+- public void dropDataset(String dataverseName, String datasetName) {
+-- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
+-+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
+- droppedCache.addDatasetIfNotExists(dataset);
+- logAndApply(new MetadataLogicalOperation(dataset, false));
++ /*
++@@ -278,9 +237,9 @@
++ * daemon thread so that it does not stop the JVM from exiting when all
++ * other threads are done with their work.
++ */
++- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
++- logFlusher.setDaemon(true);
++- logFlusher.start();
+++ logPageFlusher = new LogPageFlushThread(this);
+++ logPageFlusher.setDaemon(true);
+++ logPageFlusher.start();
+ }
+
+- public void dropIndex(String dataverseName, String datasetName, String indexName) {
+-- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
+-+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
+- droppedCache.addIndexIfNotExists(index);
+- logAndApply(new MetadataLogicalOperation(index, false));
+- }
+-
+- public void dropDataverse(String dataverseName) {
+-- Dataverse dataverse = new Dataverse(dataverseName, null);
+-+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
+- droppedCache.addDataverseIfNotExists(dataverse);
+- logAndApply(new MetadataLogicalOperation(dataverse, false));
+- }
+-@@ -162,7 +163,7 @@
++ public int getLogPageIndex(long lsnValue) {
++@@ -312,7 +271,7 @@
++ */
++ private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
++ if (logPageStatus[pageIndex].get() == PageState.ACTIVE
++- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
+++ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
++ return;
+ }
+- return droppedCache.getDataset(dataverseName, datasetName) != null;
+- }
+--
++ try {
++@@ -338,47 +297,40 @@
++ */
++ private long getLsn(int entrySize, byte logType) throws ACIDException {
++ long pageSize = logManagerProperties.getLogPageSize();
++- boolean requiresFlushing = logType == LogType.COMMIT;
+ +
+- public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
+- if (droppedCache.getDataverse(dataverseName) != null) {
+- return true;
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
+-@@ -80,10 +80,11 @@
+- public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
+- public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
+- public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
+-+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
++ while (true) {
++ boolean forwardPage = false;
++- boolean shouldFlushPage = false;
++ long old = lsn.get();
++- int pageIndex = getLogPageIndex(old); // get the log page
++- // corresponding to the
++- // current lsn value
+++
+++ //get the log page corresponding to the current lsn value
+++ int pageIndex = getLogPageIndex(old);
++ long retVal = old;
++- long next = old + entrySize; // the lsn value for the next request,
++- // if the current request is served.
+++
+++ // the lsn value for the next request if the current request is served.
+++ long next = old + entrySize;
++ int prevPage = -1;
++- if ((next - 1) / pageSize != old / pageSize // check if the log
++- // record will cross
++- // page boundaries, a
++- // case that is not
++- // allowed.
++- || (next % pageSize == 0)) {
+++
+++ // check if the log record will cross page boundaries, a case that is not allowed.
+++ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+++
++ if ((old != 0 && old % pageSize == 0)) {
++- retVal = old; // On second thought, this shall never be the
++- // case as it means that the lsn is
++- // currently at the beginning of a page and
++- // we still need to forward the page which
++- // means that the entrySize exceeds a log
++- // page size. If this is the case, an
++- // exception is thrown before calling this
++- // API.
++- // would remove this case.
+++ // On second thought, this shall never be the case as it means that the lsn is
+++ // currently at the beginning of a page and we still need to forward the page which
+++ // means that the entrySize exceeds a log page size. If this is the case, an
+++ // exception is thrown before calling this API. would remove this case.
+++ retVal = old;
+
+- private static final ARecordType createDataverseRecordType() {
+-- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
+-- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
+-+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
+-+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
+- }
++ } else {
++- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
++- // to point to
++- // the beginning
++- // of the next
++- // page.
+++ // set the lsn to point to the beginning of the next page.
+++ retVal = ((old / pageSize) + 1) * pageSize;
++ }
+++
++ next = retVal;
++- forwardPage = true; // as the log record shall cross log page
++- // boundary, we must re-assign the lsn (so
++- // that the log record begins on a different
++- // location.
+++
+++ // as the log record shall cross log page boundary, we must re-assign the lsn so
+++ // that the log record begins on a different location.
+++ forwardPage = true;
+++
++ prevPage = pageIndex;
++ pageIndex = getNextPageInSequence(pageIndex);
++ }
++@@ -397,109 +349,51 @@
++ */
++ waitUntillPageIsAvailableForWritingLog(pageIndex);
+
+- // Helper constants for accessing fields in an ARecord of anonymous type
+-@@ -158,10 +159,11 @@
+- public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
+- public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
+- public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
+-+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
++- if (!forwardPage && requiresFlushing) {
++- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
++- if (shouldFlushPage) {
++- next = ((next / pageSize) + 1) * pageSize; /*
++- * next
++- * represents the
++- * next value of
++- * lsn after this
++- * log record has
++- * been written.
++- * If the page
++- * needs to be
++- * flushed, then
++- * we do not give
++- * any more LSNs
++- * from this
++- * page.
++- */
++- }
++- }
++- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
++- // only when the value
++- // represented by lsn is same as
++- // "old". The value is updated
++- // to "next".
+++ if (!lsn.compareAndSet(old, next)) {
+++ // Atomic call -> returns true only when the value represented by lsn is same as
+++ // "old". The value is updated to "next".
++ continue;
++ }
+
+- private static final ARecordType createDatasetRecordType() {
+- String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
+-- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
+-+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
++ if (forwardPage) {
++- //TODO
++- //this is not safe since the incoming thread may reach the same page slot with this page
++- //(differ by the log buffer size)
++- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
++- // previous
++- // page
++- // inactive
+++ addFlushRequest(prevPage, old, false);
+
+- List<IAType> internalRecordUnionList = new ArrayList<IAType>();
+- internalRecordUnionList.add(BuiltinType.ANULL);
+-@@ -179,7 +181,8 @@
+- AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
+-
+- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+-- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
+-+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
+-+ BuiltinType.AINT32 };
+- return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
++- /*
++- * decrement on the behalf of the log manager. if there are no
++- * more owners (count == 0) the page must be marked as a
++- * candidate to be flushed.
++- */
++- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
++- if (pageDirtyCount == 0) {
++- addFlushRequest(prevPage);
++- }
++-
++- /*
++- * The transaction thread that discovers the need to forward a
++- * page is made to re-acquire a lsn.
++- */
+++ // The transaction thread that discovers the need to forward a
+++ // page is made to re-acquire a lsn.
++ continue;
+++
++ } else {
++- /*
++- * the transaction thread has been given a space in a log page,
++- * but is made to wait until the page is available.
++- */
+++ // the transaction thread has been given a space in a log page,
+++ // but is made to wait until the page is available.
+++ // (Is this needed? when does this wait happen?)
++ waitUntillPageIsAvailableForWritingLog(pageIndex);
++- /*
++- * increment the counter as the transaction thread now holds a
++- * space in the log page and hence is an owner.
++- */
+++
+++ // increment the counter as the transaction thread now holds a
+++ // space in the log page and hence is an owner.
++ logPageOwnerCount[pageIndex].incrementAndGet();
++- }
++- if (requiresFlushing) {
++- if (!shouldFlushPage) {
++- /*
++- * the log record requires the page to be flushed but under
++- * the commit policy, the flush task has been deferred. The
++- * transaction thread submits its request to flush the page.
++- */
++- commitRequestStatistics.registerCommitRequest(pageIndex);
++- } else {
++- /*
++- * the flush request was approved by the commit resolver.
++- * Thus the page is marked INACTIVE as no more logs will be
++- * written on this page. The log manager needs to release
++- * its ownership. Note that transaction threads may still
++- * continue to be owners of the log page till they fill up
++- * the space allocated to them.
++- */
++- logPageStatus[pageIndex].set(PageState.INACTIVE);
++- logPageOwnerCount[pageIndex].decrementAndGet(); // on
++- // the
++- // behalf
++- // of
++- // log
++- // manager
+++
+++ // Before the count is incremented, if the flusher flushed the allocated page,
+++ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
+++ if (lastFlushedLsn.get() >= retVal) {
+++ logPageOwnerCount[pageIndex].decrementAndGet();
+++ continue;
++ }
++ }
+++
++ return retVal;
++ }
+ }
+
+-@@ -264,13 +267,14 @@
+- public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
+- public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
+- public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
+-+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
++ @Override
++- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+++ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
++ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
++ LogicalLogLocator logicalLogLocator) throws ACIDException {
++- /*
++- * logLocator is a re-usable object that is appropriately set in each
++- * invocation. If the reference is null, the log manager must throw an
++- * exception
++- */
+++
+++ HashMap<TransactionContext, Integer> map = null;
+++ int activeTxnCount;
+++
+++ // logLocator is a re-usable object that is appropriately set in each invocation.
+++ // If the reference is null, the log manager must throw an exception.
++ if (logicalLogLocator == null) {
++ throw new ACIDException(
++ " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
++@@ -519,20 +413,19 @@
+
+- private static final ARecordType createIndexRecordType() {
+- AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
+- String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
+-- "IsPrimary", "Timestamp" };
+-+ "IsPrimary", "Timestamp", "PendingOp" };
+- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+-- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
+-+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
+- return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
+- };
++ // all constraints checked and we are good to go and acquire a lsn.
++ long previousLSN = -1;
++- long currentLSN; // the will be set to the location (a long value)
++- // where the log record needs to be placed.
+
+-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+-===================================================================
+---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
+-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
+-@@ -31,6 +31,7 @@
+- import edu.uci.ics.asterix.metadata.IDatasetDetails;
+- import edu.uci.ics.asterix.metadata.MetadataManager;
+- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
+- import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
+- import edu.uci.ics.asterix.metadata.entities.Dataset;
+-@@ -226,7 +227,7 @@
+- public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
+- String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
+- String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
+-- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
+-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
+- }
++- /*
++- * The logs written by a transaction need to be linked to each other for
++- * a successful rollback/recovery. However there could be multiple
++- * threads operating concurrently that are part of a common transaction.
++- * These threads need to synchronize and record the lsn corresponding to
++- * the last log record written by (any thread of) the transaction.
++- */
++- synchronized (context) {
++- previousLSN = context.getLastLogLocator().getLsn();
+++ // the will be set to the location (a long value) where the log record needs to be placed.
+++ long currentLSN;
+++
+++ // The logs written by a transaction need to be linked to each other for
+++ // a successful rollback/recovery. However there could be multiple
+++ // threads operating concurrently that are part of a common transaction.
+++ // These threads need to synchronize and record the lsn corresponding to
+++ // the last log record written by (any thread of) the transaction.
+++ synchronized (txnCtx) {
+++ previousLSN = txnCtx.getLastLogLocator().getLsn();
++ currentLSN = getLsn(totalLogSize, logType);
++- context.setLastLSN(currentLSN);
+++ txnCtx.setLastLSN(currentLSN);
++ if (IS_DEBUG_MODE) {
++ System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
++ }
++@@ -547,48 +440,37 @@
++ * performed correctly that is ownership is released.
++ */
+
+- public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
+-@@ -236,7 +237,7 @@
+- primaryIndexes[i].getNodeGroupName());
+- MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
+- primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
+-- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
+-+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
+- }
+- }
++- boolean decremented = false; // indicates if the transaction thread
++- // has release ownership of the
++- // page.
++- boolean addedFlushRequest = false; // indicates if the transaction
++- // thread has submitted a flush
++- // request.
+++ // indicates if the transaction thread has release ownership of the page.
+++ boolean decremented = false;
+
+-@@ -267,7 +268,7 @@
+- for (int i = 0; i < secondaryIndexes.length; i++) {
+- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
+- secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
+-- secondaryIndexes[i].getPartitioningExpr(), false));
+-+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
+- }
+- }
++ int pageIndex = (int) getLogPageIndex(currentLSN);
+
+-Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+-===================================================================
+---- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
+-+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
+-@@ -95,11 +95,11 @@
+- File testFile = tcCtx.getTestFile(cUnit);
+-
+- /*************** to avoid run failure cases ****************
+-- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
+-+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
+- continue;
+- }
+- ************************************************************/
++- /*
++- * the lsn has been obtained for the log record. need to set the
++- * LogLocator instance accordingly.
++- */
+ -
+-+
+- File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
+- File actualFile = new File(PATH_ACTUAL + File.separator
+- + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
+-Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+-===================================================================
+---- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
+-+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
+-@@ -90,7 +90,7 @@
+++ // the lsn has been obtained for the log record. need to set the
+++ // LogLocator instance accordingly.
++ try {
++-
++ logicalLogLocator.setBuffer(logPages[pageIndex]);
++ int pageOffset = getLogPageOffset(currentLSN);
++ logicalLogLocator.setMemoryOffset(pageOffset);
+
+- private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
++- /*
++- * write the log header.
++- */
++- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
+++ // write the log header.
+++ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
++ resourceId, resourceMgrId, logContentSize);
+
+-- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+-+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+- AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
+- ACIDException, AsterixException {
++ // increment the offset so that the transaction can fill up the
++ // content in the correct region of the allocated space.
++ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
+
+-@@ -111,67 +111,10 @@
+- throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+- }
+- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+-- return new JobSpecification[0];
+-+ return new JobSpecification();
+- }
+--
+-- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
+-- dataset.getDatasetName());
+-- int numSecondaryIndexes = 0;
+-- for (Index index : datasetIndexes) {
+-- if (index.isSecondaryIndex()) {
+-- numSecondaryIndexes++;
+-- }
+-- }
+-- JobSpecification[] specs;
+-- if (numSecondaryIndexes > 0) {
+-- specs = new JobSpecification[numSecondaryIndexes + 1];
+-- int i = 0;
+-- // First, drop secondary indexes.
+-- for (Index index : datasetIndexes) {
+-- if (index.isSecondaryIndex()) {
+-- specs[i] = new JobSpecification();
+-- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
+-- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
+-- datasetName, index.getIndexName());
+-- IIndexDataflowHelperFactory dfhFactory;
+-- switch (index.getIndexType()) {
+-- case BTREE:
+-- dfhFactory = new LSMBTreeDataflowHelperFactory(
+-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
+-- break;
+-- case RTREE:
+-- dfhFactory = new LSMRTreeDataflowHelperFactory(
+-- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
+-- new IBinaryComparatorFactory[] { null },
+-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
+-- break;
+-- case NGRAM_INVIX:
+-- case WORD_INVIX:
+-- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
+-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
+-- break;
+-- default:
+-- throw new AsterixException("Unknown index type provided.");
+-- }
+-- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
+-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
+-- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
+-- idxSplitsAndConstraint.second);
+-- i++;
+-- }
+-- }
+-- } else {
+-- specs = new JobSpecification[1];
+-- }
+-+
+- JobSpecification specPrimary = new JobSpecification();
+-- specs[specs.length - 1] = specPrimary;
++- // a COMMIT log record does not have any content
++- // and hence the logger (responsible for putting the log content) is
++- // not invoked.
+++ // a COMMIT log record does not have any content and hence
+++ // the logger (responsible for putting the log content) is not invoked.
++ if (logContentSize != 0) {
++- logger.preLog(context, reusableLogContentObject);
+++ logger.preLog(txnCtx, reusableLogContentObject);
++ }
+
+- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
+-@@ -187,7 +130,7 @@
++ if (logContentSize != 0) {
++ // call the logger implementation and ask to fill in the log
++ // record content at the allocated space.
++- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
++- logger.postLog(context, reusableLogContentObject);
+++ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
+++ logger.postLog(txnCtx, reusableLogContentObject);
++ if (IS_DEBUG_MODE) {
++ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
++ - logRecordHelper.getLogHeaderSize(logType));
++@@ -597,10 +479,8 @@
++ }
++ }
+
+- specPrimary.addRoot(primaryBtreeDrop);
++- /*
++- * The log record has been written. For integrity checks, compute
++- * the checksum and put it at the end of the log record.
++- */
+++ // The log record has been written. For integrity checks, compute
+++ // the checksum and put it at the end of the log record.
++ int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
++ int length = totalLogSize - logRecordHelper.getLogChecksumSize();
++ long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
++@@ -611,46 +491,31 @@
++ System.out.println("--------------> LSN(" + currentLSN + ") is written");
++ }
+
+-- return specs;
+-+ return specPrimary;
+- }
++- /*
++- * release the ownership as the log record has been placed in
++- * created space.
++- */
++- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
+++ // release the ownership as the log record has been placed in created space.
+++ logPageOwnerCount[pageIndex].decrementAndGet();
+
+- public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+-Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+-===================================================================
+---- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
+-+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
+-@@ -21,6 +21,8 @@
+- import java.util.HashMap;
+- import java.util.List;
+- import java.util.Map;
+-+import java.util.concurrent.locks.ReadWriteLock;
+-+import java.util.concurrent.locks.ReentrantReadWriteLock;
++ // indicating that the transaction thread has released ownership
++ decremented = true;
+
+- import org.json.JSONException;
++- /*
++- * If the transaction thread happens to be the last owner of the log
++- * page the page must by marked as a candidate to be flushed.
++- */
++- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
++- addFlushRequest(pageIndex);
++- addedFlushRequest = true;
++- }
++-
++- /*
++- * If the log type is commit, a flush request is registered, if the
++- * log record has not reached the disk. It may be possible that this
++- * thread does not get CPU cycles and in-between the log record has
++- * been flushed to disk because the containing log page filled up.
++- */
++- if (logType == LogType.COMMIT) {
++- synchronized (logPages[pageIndex]) {
++- while (getLastFlushedLsn().get() < currentLSN) {
++- logPages[pageIndex].wait();
++- }
+++ if (logType == LogType.ENTITY_COMMIT) {
+++ map = activeTxnCountMaps.get(pageIndex);
+++ if (map.containsKey(txnCtx)) {
+++ activeTxnCount = (Integer) map.get(txnCtx);
+++ activeTxnCount++;
+++ map.put(txnCtx, activeTxnCount);
+++ } else {
+++ map.put(txnCtx, 1);
++ }
+++ addFlushRequest(pageIndex, currentLSN, false);
+++ } else if (logType == LogType.COMMIT) {
+++ addFlushRequest(pageIndex, currentLSN, true);
++ }
+
+-@@ -68,6 +70,7 @@
+- import edu.uci.ics.asterix.metadata.MetadataException;
+- import edu.uci.ics.asterix.metadata.MetadataManager;
+- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+- import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+- import edu.uci.ics.asterix.metadata.entities.Dataset;
+- import edu.uci.ics.asterix.metadata.entities.Datatype;
+-@@ -112,6 +115,7 @@
+- private final PrintWriter out;
+- private final SessionConfig sessionConfig;
+- private final DisplayFormat pdf;
+-+ private final ReadWriteLock cacheLatch;
+- private Dataverse activeDefaultDataverse;
+- private List<FunctionDecl> declaredFunctions;
++ } catch (Exception e) {
++ e.printStackTrace();
++- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
+++ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
++ + " logger encountered exception", e);
++ } finally {
++- /*
++- * If an exception was encountered and we did not release ownership
++- */
++ if (!decremented) {
++ logPageOwnerCount[pageIndex].decrementAndGet();
++ }
++@@ -667,9 +532,6 @@
+
+-@@ -121,6 +125,7 @@
+- this.out = out;
+- this.sessionConfig = pc;
+- this.pdf = pdf;
+-+ this.cacheLatch = new ReentrantReadWriteLock(true);
+- declaredFunctions = getDeclaredFunctions(aqlStatements);
++ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
++ logManagerProperties.getLogPageSize());
++-
++- //TODO Check if this is necessary
++- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
+ }
+
+-@@ -143,8 +148,7 @@
++ @Override
++@@ -747,16 +609,13 @@
++ //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
+
+- for (Statement stmt : aqlStatements) {
+- validateOperation(activeDefaultDataverse, stmt);
+-- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
+-+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+- metadataProvider.setWriterFactory(writerFactory);
+- metadataProvider.setOutputFile(outputFile);
+- metadataProvider.setConfig(config);
+-@@ -253,15 +257,9 @@
+- }
++ byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
++- // take a lock on the log page so that the page is not flushed to
++- // disk interim
+++
+++ // take a lock on the log page so that the page is not flushed to disk interim
++ synchronized (logPages[pageIndex]) {
++- if (lsnValue > getLastFlushedLsn().get()) { // need to check
++- // again
++- // (this
++- // thread may have got
++- // de-scheduled and must
++- // refresh!)
+
+- }
+-- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+- } catch (Exception e) {
+-- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+- throw new AlgebricksException(e);
+++ // need to check again (this thread may have got de-scheduled and must refresh!)
+++ if (lsnValue > getLastFlushedLsn().get()) {
+++
++ // get the log record length
++ logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
++ byte logType = pageContent[pageOffset + 4];
++@@ -765,9 +624,7 @@
++ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
++ logRecord = new byte[logRecordSize];
++
++- /*
++- * copy the log record content
++- */
+++ // copy the log record content
++ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
++ MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
++ if (logicalLogLocator == null) {
++@@ -790,9 +647,7 @@
+ }
+-- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
+-- for (JobSpecification jobspec : jobsToExecute) {
+-- runJob(hcc, jobspec);
+-- }
+ }
+- return executionResult;
++
++- /*
++- * the log record is residing on the disk, read it from there.
++- */
+++ // the log record is residing on the disk, read it from there.
++ readDiskLog(lsnValue, logicalLogLocator);
+ }
+-@@ -289,398 +287,802 @@
+
+- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
+-- DataverseDecl dvd = (DataverseDecl) stmt;
+-- String dvName = dvd.getDataverseName().getValue();
+-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-- if (dv == null) {
+-- throw new MetadataException("Unknown dataverse " + dvName);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ DataverseDecl dvd = (DataverseDecl) stmt;
+-+ String dvName = dvd.getDataverseName().getValue();
+-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-+ if (dv == null) {
+-+ throw new MetadataException("Unknown dataverse " + dvName);
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ return dv;
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new MetadataException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+-- return dv;
++@@ -860,30 +715,40 @@
++ return logPageOwnerCount[pageIndex];
+ }
+
+- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+- ACIDException {
+-- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+-- String dvName = stmtCreateDataverse.getDataverseName().getValue();
+-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+-- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+-+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
+-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+-+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+-+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+-+ }
+-+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+-+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+-- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+-- stmtCreateDataverse.getFormat()));
++- public ICommitResolver getCommitResolver() {
++- return commitResolver;
++- }
++-
++- public CommitRequestStatistics getCommitRequestStatistics() {
++- return commitRequestStatistics;
++- }
++-
++ public IFileBasedBuffer[] getLogPages() {
++ return logPages;
+ }
+
+- private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
+-- DatasetDecl dd = (DatasetDecl) stmt;
+-- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+-- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-- }
+-- String datasetName = dd.getName().getValue();
+-- DatasetType dsType = dd.getDatasetType();
+-- String itemTypeName = dd.getItemTypeName().getValue();
+-
+-- IDatasetDetails datasetDetails = null;
+-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-- datasetName);
+-- if (ds != null) {
+-- if (dd.getIfNotExists()) {
+-- return;
+-- } else {
+-- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
++- public int getLastFlushedPage() {
++- return lastFlushedPage.get();
++- }
++-
++- public void setLastFlushedPage(int lastFlushedPage) {
++- this.lastFlushedPage.set(lastFlushedPage);
++- }
++-
++ @Override
++ public TransactionSubsystem getTransactionSubsystem() {
++ return provider;
++ }
+ +
+-+ try {
+-+ DatasetDecl dd = (DatasetDecl) stmt;
+-+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+-+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+- }
+-- }
+-- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+-- itemTypeName);
+-- if (dt == null) {
+-- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+-- }
+-- switch (dd.getDatasetType()) {
+-- case INTERNAL: {
+-- IAType itemType = dt.getDatatype();
+-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-- throw new AlgebricksException("Can only partition ARecord's.");
+-+ String datasetName = dd.getName().getValue();
+-+ DatasetType dsType = dd.getDatasetType();
+-+ String itemTypeName = dd.getItemTypeName().getValue();
+++ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
+++ TransactionContext ctx = null;
+++ int count = 0;
+++ int i = 0;
+ +
+-+ IDatasetDetails datasetDetails = null;
+-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-+ datasetName);
+-+ if (ds != null) {
+-+ if (dd.getIfNotExists()) {
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ return;
+-+ } else {
+-+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+- }
+-- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+-- .getPartitioningExprs();
+-- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
+-- break;
+- }
+-- case EXTERNAL: {
+-- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+-- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+-- datasetDetails = new ExternalDatasetDetails(adapter, properties);
+-- break;
+-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+-+ itemTypeName);
+-+ if (dt == null) {
+-+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+- }
+-- case FEED: {
+-- IAType itemType = dt.getDatatype();
+-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-- throw new AlgebricksException("Can only partition ARecord's.");
+-+ switch (dd.getDatasetType()) {
+-+ case INTERNAL: {
+-+ IAType itemType = dt.getDatatype();
+-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-+ throw new AlgebricksException("Can only partition ARecord's.");
+++ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
+++ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
+++ if (entrySet != null) {
+++ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
+++ if (entry != null) {
+++ if (entry.getValue() != null) {
+++ count = entry.getValue();
+ + }
+-+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+-+ .getPartitioningExprs();
+-+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+-+ ngName);
+-+ break;
+- }
+-- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
+-- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+-- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
+-- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+-- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
+-- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+-- break;
+-+ case EXTERNAL: {
+-+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+-+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+-+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
+-+ break;
+-+ }
+-+ case FEED: {
+-+ IAType itemType = dt.getDatatype();
+-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+-+ throw new AlgebricksException("Can only partition ARecord's.");
+++ if (count > 0) {
+++ ctx = entry.getKey();
+++ for (i = 0; i < count; i++) {
+++ ctx.decreaseActiveTransactionCountOnIndexes();
+++ }
+ + }
+-+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+-+ .getPartitioningExprs();
+-+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+-+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+-+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+-+ .getConfiguration();
+-+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+-+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+-+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+-+ break;
+ + }
+- }
+-+
+-+ //#. add a new dataset with PendingAddOp
+-+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
+-+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
+-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+-+
+-+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+-+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+-+ dataverseName);
+-+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+-+ metadataProvider);
+-+
+-+ //#. make metadataTxn commit before calling runJob.
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ //#. runJob
+-+ runJob(hcc, jobSpec);
+-+
+-+ //#. begin new metadataTxn
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ + }
+++ }
+ +
+-+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+-+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+-+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
+-+ IMetadataEntity.PENDING_NO_OP));
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+-- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+-- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
+-- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+-- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+-- dataverseName);
+-- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+-- }
+- }
+++ map.clear();
+++ }
++ }
+
+- private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+-- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-- }
+-- String datasetName = stmtCreateIndex.getDatasetName().getValue();
+-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-- datasetName);
+-- if (ds == null) {
+-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-- + dataverseName);
+-- }
+-- String indexName = stmtCreateIndex.getIndexName().getValue();
+-- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+-- datasetName, indexName);
+-- if (idx != null) {
+-- if (!stmtCreateIndex.getIfNotExists()) {
+-- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+-- } else {
+-- stmtCreateIndex.setNeedToCreate(false);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+-+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+- }
+-- } else {
+-+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
+-+
+-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+-+ datasetName);
+-+ if (ds == null) {
+-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-+ + dataverseName);
+-+ }
+-+
+-+ String indexName = stmtCreateIndex.getIndexName().getValue();
+-+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+-+ datasetName, indexName);
+-+
+-+ if (idx != null) {
+-+ if (!stmtCreateIndex.getIfNotExists()) {
+-+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+-+ } else {
+-+ stmtCreateIndex.setNeedToCreate(false);
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ return;
+-+ }
+-+ }
+-+
+-+ //#. add a new index with PendingAddOp
+- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+-- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
+-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+-+ IMetadataEntity.PENDING_ADD_OP);
+- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+-- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
++ /*
++@@ -895,36 +760,82 @@
++ class LogPageFlushThread extends Thread {
+
+-+ //#. create the index artifact in NC.
+- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+-- JobSpecification loadIndexJobSpec = IndexOperations
+-- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+-- runJob(hcc, loadIndexJobSpec);
+-+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
+-+ if (spec == null) {
+-+ throw new AsterixException("Failed to create job spec for creating index '"
+-+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ runJob(hcc, spec);
+-+
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+
+-+ //#. load data into the index in NC.
+-+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+-+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+-+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ runJob(hcc, spec);
+-+
+-+ //#. begin new metadataTxn
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+
+-+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+-+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+-+ indexName);
+-+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+-+ IMetadataEntity.PENDING_NO_OP);
+-+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+- }
++ private LogManager logManager;
+++ /*
+++ * pendingFlushRequests is a map with key as Integer denoting the page
+++ * index. When a (transaction) thread discovers the need to flush a page, it
+++ * puts its Thread object into the corresponding value that is a
+++ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
+++ * this map in order of page index (and circling around). The flusher thread
+++ * needs to flush pages in order and waits for a thread to deposit an object
+++ * in the blocking queue corresponding to the next page in order. A request
+++ * to flush a page is conveyed to the flush thread by simply depositing an
+++ * object in to corresponding blocking queue. It is blocking in the sense
+++ * that the flusher thread will continue to wait for an object to arrive in
+++ * the queue. The object itself is ignored by the fliusher and just acts as
+++ * a signal/event that a page needs to be flushed.
+++ */
+++ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
+++ private final Object[] flushRequests;
+++ private int lastFlushedPageIndex;
+++ private final long groupCommitWaitPeriod;
+
+- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
+- MetadataException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- TypeDecl stmtCreateType = (TypeDecl) stmt;
+-- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-- }
+-- String typeName = stmtCreateType.getIdent().getValue();
+-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+-- if (dv == null) {
+-- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+-- }
+-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-- if (dt != null) {
+-- if (!stmtCreateType.getIfNotExists())
+-- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+-- } else {
+-- if (builtinTypeMap.get(typeName) != null) {
+-- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ TypeDecl stmtCreateType = (TypeDecl) stmt;
+-+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+-+ }
+-+ String typeName = stmtCreateType.getIdent().getValue();
+-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+-+ if (dv == null) {
+-+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+-+ }
+-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-+ if (dt != null) {
+-+ if (!stmtCreateType.getIfNotExists()) {
+-+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+-+ }
+- } else {
+-- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+-- dataverseName);
+-- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+-- IAType type = typeMap.get(typeSignature);
+-- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+-+ if (builtinTypeMap.get(typeName) != null) {
+-+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+-+ } else {
+-+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+-+ dataverseName);
+-+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+-+ IAType type = typeMap.get(typeSignature);
+-+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+-+ }
+- }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
++ public LogPageFlushThread(LogManager logManager) {
++ this.logManager = logManager;
++ setName("Flusher");
+++ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
+++ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
+++ this.flushRequests = new Object[numLogPages];
+++ for (int i = 0; i < numLogPages; i++) {
+++ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
+++ flushRequests[i] = new Object();
+++ }
+++ this.lastFlushedPageIndex = -1;
+++ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+ }
+
+- private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+-- String dvName = stmtDelete.getDataverseName().getValue();
+-
+-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+-- if (dv == null) {
+-- if (!stmtDelete.getIfExists()) {
+-- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+-+ String dvName = stmtDelete.getDataverseName().getValue();
+-+
+-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+-+ if (dv == null) {
+-+ if (!stmtDelete.getIfExists()) {
+-+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+++ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
+++ synchronized (logManager.getLogPage(pageIndex)) {
+++ //return if flushedLSN >= lsn
+++ if (logManager.getLastFlushedLsn().get() >= lsn) {
+ + return;
+- }
+-- } else {
+-+
+-+ //#. prepare jobs which will drop corresponding datasets with indexes.
+- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+- for (int j = 0; j < datasets.size(); j++) {
+- String datasetName = datasets.get(j).getDatasetName();
+- DatasetType dsType = datasets.get(j).getDatasetType();
+- if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
+-+
+- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
+- for (int k = 0; k < indexes.size(); k++) {
+- if (indexes.get(k).isSecondaryIndex()) {
+-- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
+-- metadataProvider);
+-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+-+ indexes.get(k).getIndexName());
+-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+- }
+- }
+-+
+-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+- }
+-- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
+- }
+-
+-+ //#. mark PendingDropOp on the dataverse record by
+-+ // first, deleting the dataverse record from the DATAVERSE_DATASET
+-+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+-+ IMetadataEntity.PENDING_DROP_OP));
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ for (JobSpecification jobSpec : jobsToExecute) {
+-+ runJob(hcc, jobSpec);
+ + }
+ +
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+++ //put a new request to the queue only if the request on the page is not in the queue.
+++ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
+ +
+-+ //#. finally, delete the dataverse.
+-+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
+- activeDefaultDataverse = null;
+- }
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+++ //return if the request is asynchronous
+++ if (!isSynchronous) {
+++ return;
+ + }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+- }
+-
+- private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- DropStatement stmtDelete = (DropStatement) stmt;
+-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-- }
+-- String datasetName = stmtDelete.getDatasetName().getValue();
+-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-- if (ds == null) {
+-- if (!stmtDelete.getIfExists())
+-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-- + dataverseName + ".");
+-- } else {
+ +
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ DropStatement stmtDelete = (DropStatement) stmt;
+-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+-+ }
+-+ String datasetName = stmtDelete.getDatasetName().getValue();
+-+
+-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-+ if (ds == null) {
+-+ if (!stmtDelete.getIfExists()) {
+-+ throw new AlgebricksException("There is no dataset with this name " + datasetName
+-+ + " in dataverse " + dataverseName + ".");
+++ //wait until there is flush.
+++ boolean isNotified = false;
+++ while (!isNotified) {
+++ try {
+++ logManager.getLogPage(pageIndex).wait();
+++ isNotified = true;
+++ } catch (InterruptedException e) {
+++ e.printStackTrace();
+ + }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ return;
+ + }
+++ }
+++ }
+ +
+- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-+
+-+ //#. prepare jobs to drop the datatset and the indexes in NC
+- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+- for (int j = 0; j < indexes.size(); j++) {
+-- if (indexes.get(j).isPrimaryIndex()) {
+-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
+-- metadataProvider);
+-+ if (indexes.get(j).isSecondaryIndex()) {
+-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+-+ indexes.get(j).getIndexName());
+-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
++ @Override
++ public void run() {
++ while (true) {
++ try {
++- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
+++ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
++
++- /*
++- * A wait call on the linkedBLockingQueue. The flusher thread is
++- * notified when an object is added to the queue. Please note
++- * that each page has an associated blocking queue.
++- */
++- logManager.getPendingFlushRequests(pageToFlush).take();
+++ // A wait call on the linkedBLockingQueue. The flusher thread is
+++ // notified when an object is added to the queue. Please note
+++ // that each page has an associated blocking queue.
+++ flushRequestQueue[pageToFlush].take();
++
++- /*
++- * The LogFlusher was waiting for a page to be marked as a
++- * candidate for flushing. Now that has happened. The thread
++- * shall proceed to take a lock on the log page
++- */
++- synchronized (logManager.getLogPages()[pageToFlush]) {
+++ synchronized (logManager.getLogPage(pageToFlush)) {
++
++- /*
++- * lock the internal state of the log manager and create a
++- * log file if necessary.
++- */
+++ // lock the internal state of the log manager and create a
+++ // log file if necessary.
++ int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
++ int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
++ + logManager.getLogManagerProperties().getLogPageSize());
++@@ -936,198 +847,60 @@
++ logManager.getLogManagerProperties().getLogPageSize());
+ }
+- }
+-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+-+
+-+ //#. mark the existing dataset as PendingDropOp
+-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+-+ MetadataManager.INSTANCE.addDataset(
+-+ mdTxnCtx,
+-+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
+-+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ //#. run the jobs
+-+ for (JobSpecification jobSpec : jobsToExecute) {
+-+ runJob(hcc, jobSpec);
+-+ }
+-+
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+- }
+-- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
+-+
+-+ //#. finally, delete the dataset.
+-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+- }
+
+- private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+-- String datasetName = stmtIndexDrop.getDatasetName().getValue();
+-- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+-+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
+-+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+-+ }
+-+
+-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-+ if (ds == null) {
+-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-+ + dataverseName);
+-+ }
+-+
+-+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-+ String indexName = stmtIndexDrop.getIndexName().getValue();
+-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-+ if (index == null) {
+-+ if (!stmtIndexDrop.getIfExists()) {
+-+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
+-+ }
+-+ } else {
+-+ //#. prepare a job to drop the index in NC.
+-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+-+ indexName);
+-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+-+
+-+ //#. mark PendingDropOp on the existing index
+-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-+ MetadataManager.INSTANCE.addIndex(
+-+ mdTxnCtx,
+-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
+-+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+-+
+-+ //#. commit the existing transaction before calling runJob.
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ for (JobSpecification jobSpec : jobsToExecute) {
+-+ runJob(hcc, jobSpec);
+-+ }
+-+
+-+ //#. begin a new transaction
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+
+-+ //#. finally, delete the existing index
+-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-+ }
+-+ } else {
+-+ throw new AlgebricksException(datasetName
+-+ + " is an external dataset. Indexes are not maintained for external datasets.");
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+-- if (ds == null)
+-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+-- + dataverseName);
+-- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-- String indexName = stmtIndexDrop.getIndexName().getValue();
+-- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-- if (idx == null) {
+-- if (!stmtIndexDrop.getIfExists())
+-- throw new AlgebricksException("There is no index with this name " + indexName + ".");
+-- } else
+-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
+-- } else {
+-- throw new AlgebricksException(datasetName
+-- + " is an external dataset. Indexes are not maintained for external datasets.");
+-- }
+- }
++- logManager.getLogPage(pageToFlush).flush(); // put the
++- // content to
++- // disk, the
++- // thread still
++- // has a lock on
++- // the log page
+++ //#. sleep during the groupCommitWaitTime
+++ sleep(groupCommitWaitPeriod);
+
+- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+- ACIDException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+-- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+-- if (dataverseName == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+-+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+-+ if (dataverseName == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+-+ }
+-+ String typeName = stmtTypeDrop.getTypeName().getValue();
+-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-+ if (dt == null) {
+-+ if (!stmtTypeDrop.getIfExists())
+-+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+-+ } else {
+-+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+-- String typeName = stmtTypeDrop.getTypeName().getValue();
+-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+-- if (dt == null) {
+-- if (!stmtTypeDrop.getIfExists())
+-- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+-- } else {
+-- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+-- }
+- }
++- /*
++- * acquire lock on the log manager as we need to update the
++- * internal bookkeeping data.
++- */
+++ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
+++ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+
+- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+- ACIDException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+-- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+-- if (ng == null) {
+-- if (!stmtDelete.getIfExists())
+-- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+-- } else {
+-- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+-+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+-+ if (ng == null) {
+-+ if (!stmtDelete.getIfExists())
+-+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+-+ } else {
+-+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+-+ }
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+- }
++- // increment the last flushed lsn.
++- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
++- .getLogPageSize());
+++ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
+++ // meaning every one has finished writing logs on this page.
+++ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
+++ sleep(0);
+++ }
+
+- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
+- ACIDException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+-- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+-- if (dataverse == null) {
+-- throw new AlgebricksException(" dataverse not specified ");
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+-+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+-+ if (dataverse == null) {
+-+ throw new AlgebricksException(" dataverse not specified ");
+-+ }
+-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+-+ if (dv == null) {
+-+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+-+ }
+-+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+-+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+-+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+-+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+-- if (dv == null) {
+-- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+-- }
+-- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+-- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+-- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+-- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+- }
++- /*
++- * the log manager gains back ownership of the page. this is
++- * reflected by incrementing the owner count of the page.
++- * recall that when the page is begin flushed the owner
++- * count is actually 0 Value of zero implicitly indicates
++- * that the page is operated upon by the log flusher thread.
++- */
++- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
+++ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
+++ // meaning it is flushing.
+++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
+
+- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
+- AlgebricksException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+-- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+-- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+-- if (function == null) {
+-- if (!stmtDropFunction.getIfExists())
+-- throw new AlgebricksException("Unknonw function " + signature);
+-- } else {
+-- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+-+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+-+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+-+ if (function == null) {
+-+ if (!stmtDropFunction.getIfExists())
+-+ throw new AlgebricksException("Unknonw function " + signature);
+-+ } else {
+-+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+-+ }
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+- }
+- }
++- /*
++- * get the number of log buffers that have been written so
++- * far. A log buffer = number of log pages * size of a log
++- * page
++- */
++- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
++- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
++- numCycles--;
++- }
+++ // put the content to disk (the thread still has a lock on the log page)
+++ logManager.getLogPage(pageToFlush).flush();
+
+- private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+-- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+-- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
+-- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
++- /*
++- * Map the log page to a new region in the log file.
++- */
+++ // increment the last flushed lsn and lastFlushedPage
+++ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
+++ lastFlushedPageIndex = pageToFlush;
+
+-- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+-- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+-- jobsToExecute.add(job.getJobSpec());
+-- // Also load the dataset's secondary indexes.
+-- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+-- .getDatasetName().getValue());
+-- for (Index index : datasetIndexes) {
+-- if (!index.isSecondaryIndex()) {
+-- continue;
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+++ // decrement activeTxnCountOnIndexes
+++ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
+ +
+-+ try {
+-+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+-+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+-+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
+-+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+-+ loadStmt.dataIsAlreadySorted());
+++ // reset the count to 1
+++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
+ +
+-+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+-+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+-+ jobsToExecute.add(job.getJobSpec());
+-+ // Also load the dataset's secondary indexes.
+-+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+-+ .getDatasetName().getValue());
+-+ for (Index index : datasetIndexes) {
+-+ if (!index.isSecondaryIndex()) {
+-+ continue;
+-+ }
+-+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
+-+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
+-+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
+-+ index.getIndexType());
+-+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+- }
+-- // Create CompiledCreateIndexStatement from metadata entity 'index'.
+-- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+-- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+-- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ for (JobSpecification jobspec : jobsToExecute) {
+-+ runJob(hcc, jobspec);
+-+ }
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+- }
+++ // Map the log page to a new region in the log file.
++ long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
++ + logManager.getLogManagerProperties().getLogBufferSize();
+
+- private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- metadataProvider.setWriteTransaction(true);
+-- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+-- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+-- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+-- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
++- /*
++- * long nextPos = (numCycles + 1)
++- * logManager.getLogManagerProperties() .getLogBufferSize()
++- * + pageToFlush logManager.getLogManagerProperties()
++- * .getLogPageSize();
++- */
++ logManager.resetLogPage(nextWritePosition, pageToFlush);
+
+-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-- if (compiled.first != null) {
+-- jobsToExecute.add(compiled.first);
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ metadataProvider.setWriteTransaction(true);
+-+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+-+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+-+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+-+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+-+
+-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+-+ clfrqs);
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+ if (compiled.first != null) {
+-+ runJob(hcc, compiled.first);
+-+ }
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+- }
++ // mark the page as ACTIVE
++ logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
+
+- private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- metadataProvider.setWriteTransaction(true);
+-- InsertStatement stmtInsert = (InsertStatement) stmt;
+-- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+-- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+-- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-- if (compiled.first != null) {
+-- jobsToExecute.add(compiled.first);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ metadataProvider.setWriteTransaction(true);
+-+ InsertStatement stmtInsert = (InsertStatement) stmt;
+-+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+-+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+-+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+-+ clfrqs);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ if (compiled.first != null) {
+-+ runJob(hcc, compiled.first);
+-+ }
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+- }
++- // notify all waiting (transaction) threads.
++- // Transaction thread may be waiting for the page to be
++- // available or may have a commit log record on the page
++- // that got flushed.
++- logManager.getLogPages()[pageToFlush].notifyAll();
++- logManager.setLastFlushedPage(pageToFlush);
+++ //#. checks the queue whether there is another flush request on the same log buffer
+++ // If there is another request, then simply remove it.
+++ if (flushRequestQueue[pageToFlush].peek() != null) {
+++ flushRequestQueue[pageToFlush].take();
+++ }
+
+- private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- metadataProvider.setWriteTransaction(true);
+-- DeleteStatement stmtDelete = (DeleteStatement) stmt;
+-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+-- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+-- stmtDelete.getVarCounter(), metadataProvider);
+-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+-- if (compiled.first != null) {
+-- jobsToExecute.add(compiled.first);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ metadataProvider.setWriteTransaction(true);
+-+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
+-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+-+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+-+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+-+ stmtDelete.getVarCounter(), metadataProvider);
+-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+-+ clfrqs);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ if (compiled.first != null) {
+-+ runJob(hcc, compiled.first);
+-+ }
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+++ // notify all waiting (transaction) threads.
+++ logManager.getLogPage(pageToFlush).notifyAll();
++ }
++ } catch (IOException ioe) {
++ ioe.printStackTrace();
++ throw new Error(" exception in flushing log page", ioe);
++ } catch (InterruptedException e) {
++ e.printStackTrace();
++- break; // must break from the loop as the exception indicates
++- // some thing horrendous has happened elsewhere
+++ break;
++ }
+ }
+ }
++-}
++-
++-/*
++- * TODO: By default the commit policy is to commit at each request and not have
++- * a group commit. The following code needs to change to support group commit.
++- * The code for group commit has not been tested thoroughly and is under
++- * development.
++- */
++-class BasicCommitResolver implements ICommitResolver {
++-
++- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
++- CommitRequestStatistics commitRequestStatistics) {
++- return true;
++- }
++-
++- public void init(LogManager logManager) {
++- }
++-}
++-
++-class GroupCommitResolver implements ICommitResolver {
++-
++- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
++- CommitRequestStatistics commitRequestStatistics) {
++- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
++- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
++- if (timestamp == -1) {
++- if (maxCommitWait == 0) {
++- return true;
++- } else {
++- timestamp = System.currentTimeMillis();
++- }
++- }
++- long currenTime = System.currentTimeMillis();
++- if (currenTime - timestamp > maxCommitWait) {
++- return true;
++- }
++- return false;
++- }
++-
++- public void init(LogManager logManager) {
++- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
++- groupCommitHandler.setDaemon(true);
++- groupCommitHandler.start();
++- }
++-
++- class GroupCommitHandlerThread extends Thread {
++-
++- private LogManager logManager;
++-
++- public GroupCommitHandlerThread(LogManager logManager) {
++- this.logManager = logManager;
++- setName("Group Commit Handler");
++- }
++-
++- @Override
++- public void run() {
++- int pageIndex = -1;
++- while (true) {
++- pageIndex = logManager.getNextPageInSequence(pageIndex);
++- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
++- .getPageLevelLastCommitRequestTimestamp(pageIndex);
++- if (lastCommitRequeestTimestamp != -1
++- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
++- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
++- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
++- if (dirtyCount == 0) {
++- try {
++- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
++- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
++- } catch (InterruptedException e) {
++- e.printStackTrace();
++- break;
++- }
++- logManager.getCommitRequestStatistics().committedPage(pageIndex);
++- }
++- }
++- }
++- }
++- }
++-
++-}
++-
++-interface ICommitResolver {
++- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
++- CommitRequestStatistics commitRequestStatistics);
++-
++- public void init(LogManager logManager);
++-}
++-
++-/**
++- * Represents a collection of all commit requests by transactions for each log
++- * page. The requests are accumulated until the commit policy triggers a flush
++- * of the corresponding log page. Upon a flush of a page, all commit requests
++- * for the page are cleared.
++- */
++-class CommitRequestStatistics {
++-
++- AtomicInteger[] pageLevelCommitRequestCount;
++- AtomicLong[] pageLevelLastCommitRequestTimestamp;
++-
++- public CommitRequestStatistics(int numPages) {
++- pageLevelCommitRequestCount = new AtomicInteger[numPages];
++- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
++- for (int i = 0; i < numPages; i++) {
++- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
++- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
++- }
++- }
++-
++- public void registerCommitRequest(int pageIndex) {
++- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
++- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
++- }
++-
++- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
++- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
++- }
++-
++- public void committedPage(int pageIndex) {
++- pageLevelCommitRequestCount[pageIndex].set(0);
++- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
++- }
++-
++-}
+++}
++\ No newline at end of file
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
++===================================================================
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
++@@ -152,6 +152,9 @@
++ case LogType.UPDATE:
++ logTypeDisplay = "UPDATE";
++ break;
+++ case LogType.ENTITY_COMMIT:
+++ logTypeDisplay = "ENTITY_COMMIT";
+++ break;
++ }
++ builder.append(" LSN : ").append(logicalLogLocator.getLsn());
++ builder.append(" Log Type : ").append(logTypeDisplay);
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
++===================================================================
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
++@@ -18,5 +18,6 @@
+
+-@@ -704,46 +1106,109 @@
++ public static final byte UPDATE = 0;
++ public static final byte COMMIT = 1;
+++ public static final byte ENTITY_COMMIT = 2;
+
+- private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+-- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
++ }
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
++===================================================================
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
++@@ -1,5 +1,5 @@
++ /*
++- * Copyright 2009-2010 by The Regents of the University of California
+++ * Copyright 2009-2012 by The Regents of the University of California
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * you may obtain a copy of the License from
++@@ -41,7 +41,7 @@
++ private int logPageSize = 128 * 1024; // 128 KB
++ private int numLogPages = 8; // number of log pages in the log buffer.
+
+-- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
+-- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
++- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
+++ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
++ // commit record will wait before
++ // the housing page is marked for
++ // flushing.
++Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
++===================================================================
++--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
+++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
++@@ -184,6 +184,7 @@
++ break;
+
+-- Dataset dataset;
+-- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+-- .getDatasetName().getValue());
+-- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+-- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+-- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
+-+ try {
+-+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+-+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+-+
+-+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
+-+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
+-+
+-+ Dataset dataset;
+-+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+-+ .getDatasetName().getValue());
+-+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+-+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+-+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
+-+ + " is not a feed dataset");
+-+ }
+-+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+-+ cbfs.setQuery(bfs.getQuery());
+-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ if (compiled.first != null) {
+-+ runJob(hcc, compiled.first);
+-+ }
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+-- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+-- cbfs.setQuery(bfs.getQuery());
+-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+-- if (compiled.first != null) {
+-- jobsToExecute.add(compiled.first);
+-- }
+- }
++ case LogType.COMMIT:
+++ case LogType.ENTITY_COMMIT:
++ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
++ logRecordHelper.getDatasetId(currentLogLocator),
++ logRecordHelper.getPKHashValue(currentLogLocator));
++@@ -218,6 +219,7 @@
++ IIndex index = null;
++ LocalResource localResource = null;
++ ILocalResourceMetadata localResourceMetadata = null;
+++ List<Long> resourceIdList = new ArrayList<Long>();
+
+- private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
+-- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+-- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+-- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
+-- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+-- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+-+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+-+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+-+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
+-+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+-+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
+-+
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ runJob(hcc, jobSpec);
+-+
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+-+ }
+- }
++ //#. get indexLifeCycleManager
++ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
++@@ -272,6 +274,8 @@
++ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
++ localResource.getResourceName(), localResource.getPartition());
++ indexLifecycleManager.register(resourceId, index);
+++ indexLifecycleManager.open(resourceId);
+++ resourceIdList.add(resourceId);
++ }
+
+- private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+- List<JobSpecification> jobsToExecute) throws Exception {
+-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+-- if (compiled.first != null) {
+-- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+-- jobsToExecute.add(compiled.first);
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ boolean bActiveTxn = true;
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireReadLatch();
+-+
+-+ try {
+-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+-+
+-+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ bActiveTxn = false;
+-+
+-+ if (compiled.first != null) {
+-+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+-+ runJob(hcc, compiled.first);
+-+ }
+-+
+-+ return queryResult;
+-+ } catch (Exception e) {
+-+ if (bActiveTxn) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ }
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseReadLatch();
+- }
+-- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+- }
++ /***************************************************/
++@@ -300,6 +304,7 @@
++ break;
+
+- private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
+-@@ -768,20 +1233,32 @@
+- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
+- ACIDException {
+-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+-- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+-- if (ng != null) {
+-- if (!stmtCreateNodegroup.getIfNotExists())
+-- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+-- } else {
+-- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+-- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+-- for (Identifier id : ncIdentifiers) {
+-- ncNames.add(id.getValue());
+-+
+-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ acquireWriteLatch();
+-+
+-+ try {
+-+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+-+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+-+ if (ng != null) {
+-+ if (!stmtCreateNodegroup.getIfNotExists())
+-+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+-+ } else {
+-+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+-+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+-+ for (Identifier id : ncIdentifiers) {
+-+ ncNames.add(id.getValue());
+-+ }
+-+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
++ case LogType.COMMIT:
+++ case LogType.ENTITY_COMMIT:
++ //do nothing
++ break;
++
++@@ -308,6 +313,11 @@
+ }
+-- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+ } catch (Exception e) {
+-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+-+ throw new AlgebricksException(e);
+-+ } finally {
+-+ releaseWriteLatch();
+ }
+- }
+-
+-@@ -791,10 +1268,37 @@
+-
+- private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+- String indexName, AqlMetadataProvider metadataProvider) throws Exception {
+-+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-+
+-+ //#. mark PendingDropOp on the existing index
+-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+-+ MetadataManager.INSTANCE.addIndex(
+-+ mdTxnCtx,
+-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
+-+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+-- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+-- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+-- indexName);
+-+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
+-+
+-+ //#. commit the existing transaction before calling runJob.
+-+ // the caller should begin the transaction before calling this function.
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+
+-+ try {
+-+ runJob(hcc, jobSpec);
+-+ } catch (Exception e) {
+-+ //need to create the mdTxnCtx to be aborted by caller properly
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ throw e;
++
+++ //close all indexes
+++ for (long r : resourceIdList) {
+++ indexLifecycleManager.close(r);
+ + }
+-+
+-+ //#. begin a new transaction
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+
+-+ //#. finally, delete the existing index
+-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+++
++ JobIdFactory.initJobId(maxJobId);
+ }
+
+- private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
+-@@ -803,10 +1307,32 @@
+- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+-- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+-- for (JobSpecification spec : jobSpecs)
+-- runJob(hcc, spec);
+-+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+-+
+-+ //#. mark PendingDropOp on the existing dataset
+-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+-+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
+-+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+-+
+-+ //#. commit the transaction
+-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+-+
+-+ //#. run the job
+-+ try {
+-+ runJob(hcc, jobSpec);
+-+ } catch (Exception e) {
+-+ //need to create the mdTxnCtx to be aborted by caller properly
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+-+ throw e;
++@@ -539,6 +549,7 @@
++ break;
++
++ case LogType.COMMIT:
+++ case LogType.ENTITY_COMMIT:
++ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
++ if (undoLSNSet != null) {
++ loserTxnTable.remove(tempKeyTxnId);
++Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
++===================================================================
++--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
+++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
++@@ -42,6 +42,16 @@
++ List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
++ for (CompilationUnit cUnit : cUnits) {
++ File testFile = tcCtx.getTestFile(cUnit);
+++
+++ /*****************
+++ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
+++ System.out.println(testFile.getAbsolutePath());
+++ continue;
+ + }
+-+
+-+ //#. start a new transaction
+-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+- }
+-+
+-+ //#. finally, delete the existing dataset.
+- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+- }
+++ System.out.println(testFile.getAbsolutePath());
+++ *****************/
+++
+++
++ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
++ File actualFile = new File(PATH_ACTUAL + File.separator
++ + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
++Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
++===================================================================
++--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
+++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
++@@ -95,9 +95,10 @@
++ File testFile = tcCtx.getTestFile(cUnit);
+
+-@@ -831,4 +1357,20 @@
+- }
+- return format;
+- }
+-+
+-+ private void acquireWriteLatch() {
+-+ cacheLatch.writeLock().lock();
+-+ }
+-+
+-+ private void releaseWriteLatch() {
+-+ cacheLatch.writeLock().unlock();
+-+ }
+-+
+-+ private void acquireReadLatch() {
+-+ cacheLatch.readLock().lock();
+-+ }
+-+
+-+ private void releaseReadLatch() {
+-+ cacheLatch.readLock().unlock();
+-+ }
+- }
++ /*************** to avoid run failure cases ****************
++- if (!testFile.getAbsolutePath().contains("index-selection/")) {
+++ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
++ continue;
++ }
+++ System.out.println(testFile.getAbsolutePath());
++ ************************************************************/
++
++ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);