implemented Atomic DDL operations and passed existing tests.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1070 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index a2b2163..620fb1a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -68,6 +68,7 @@
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -143,8 +144,7 @@
for (Statement stmt : aqlStatements) {
validateOperation(activeDefaultDataverse, stmt);
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
metadataProvider.setWriterFactory(writerFactory);
metadataProvider.setOutputFile(outputFile);
metadataProvider.setConfig(config);
@@ -253,15 +253,9 @@
}
}
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
throw new AlgebricksException(e);
}
- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
- for (JobSpecification jobspec : jobsToExecute) {
- runJob(hcc, jobspec);
- }
}
return executionResult;
}
@@ -289,398 +283,802 @@
private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
- DataverseDecl dvd = (DataverseDecl) stmt;
- String dvName = dvd.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv == null) {
- throw new MetadataException("Unknown dataverse " + dvName);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ DataverseDecl dvd = (DataverseDecl) stmt;
+ String dvName = dvd.getDataverseName().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ if (dv == null) {
+ throw new MetadataException("Unknown dataverse " + dvName);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return dv;
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new MetadataException(e);
+ } finally {
+ releaseReadLatch();
}
- return dv;
}
private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
- String dvName = stmtCreateDataverse.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+ }
+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
- stmtCreateDataverse.getFormat()));
}
private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
- DatasetDecl dd = (DatasetDecl) stmt;
- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = dd.getName().getValue();
- DatasetType dsType = dd.getDatasetType();
- String itemTypeName = dd.getItemTypeName().getValue();
- IDatasetDetails datasetDetails = null;
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds != null) {
- if (dd.getIfNotExists()) {
- return;
- } else {
- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DatasetDecl dd = (DatasetDecl) stmt;
+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
}
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
- itemTypeName);
- if (dt == null) {
- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
- }
- switch (dd.getDatasetType()) {
- case INTERNAL: {
- IAType itemType = dt.getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only partition ARecord's.");
+ String datasetName = dd.getName().getValue();
+ DatasetType dsType = dd.getDatasetType();
+ String itemTypeName = dd.getItemTypeName().getValue();
+
+ IDatasetDetails datasetDetails = null;
+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ if (ds != null) {
+ if (dd.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
}
- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
- .getPartitioningExprs();
- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
- break;
}
- case EXTERNAL: {
- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
- datasetDetails = new ExternalDatasetDetails(adapter, properties);
- break;
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+ itemTypeName);
+ if (dt == null) {
+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
}
- case FEED: {
- IAType itemType = dt.getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only partition ARecord's.");
+ switch (dd.getDatasetType()) {
+ case INTERNAL: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+ ngName);
+ break;
}
- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
- break;
+ case EXTERNAL: {
+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
+ break;
+ }
+ case FEED: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getConfiguration();
+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+ break;
+ }
}
- }
- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
- dataverseName);
- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
+
+ //#. add a new dataset with PendingAddOp
+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+
+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+ dataverseName);
+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+ metadataProvider);
+
+ //#. make metadataTxn commit before calling runJob.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ //#. runJob
+ runJob(hcc, jobSpec);
+
+ //#. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ }
+
+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
+ IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = stmtCreateIndex.getDatasetName().getValue();
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds == null) {
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- }
- String indexName = stmtCreateIndex.getIndexName().getValue();
- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, indexName);
- if (idx != null) {
- if (!stmtCreateIndex.getIfNotExists()) {
- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
- } else {
- stmtCreateIndex.setNeedToCreate(false);
- }
- } else {
- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName);
+ if (ds == null) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+ + dataverseName);
+ }
+
+ String indexName = stmtCreateIndex.getIndexName().getValue();
+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+ datasetName, indexName);
+
+ if (idx != null) {
+ if (!stmtCreateIndex.getIfNotExists()) {
+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+ } else {
+ stmtCreateIndex.setNeedToCreate(false);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ }
+ }
+
+ //#. add a new index with PendingAddOp
+ Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+ IMetadataEntity.PENDING_ADD_OP);
+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+
+ //#. create the index artifact in NC.
CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
- JobSpecification loadIndexJobSpec = IndexOperations
- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
- runJob(hcc, loadIndexJobSpec);
+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
+ if (spec == null) {
+ throw new AsterixException("Failed to create job spec for creating index '"
+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, spec);
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. load data into the index in NC.
+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, spec);
+
+ //#. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+ indexName);
+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
+ IMetadataEntity.PENDING_NO_OP);
+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
MetadataException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- TypeDecl stmtCreateType = (TypeDecl) stmt;
- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String typeName = stmtCreateType.getIdent().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
- if (dv == null) {
- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
- if (dt != null) {
- if (!stmtCreateType.getIfNotExists())
- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
- } else {
- if (builtinTypeMap.get(typeName) != null) {
- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
- } else {
- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
- dataverseName);
- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
- IAType type = typeMap.get(typeSignature);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ TypeDecl stmtCreateType = (TypeDecl) stmt;
+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
}
+ String typeName = stmtCreateType.getIdent().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+ if (dv == null) {
+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
+ }
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+ if (dt != null) {
+ if (!stmtCreateType.getIfNotExists()) {
+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+ }
+ } else {
+ if (builtinTypeMap.get(typeName) != null) {
+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+ } else {
+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
+ dataverseName);
+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+ IAType type = typeMap.get(typeSignature);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+ }
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- String dvName = stmtDelete.getDataverseName().getValue();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
- if (dv == null) {
- if (!stmtDelete.getIfExists()) {
- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+ String dvName = stmtDelete.getDataverseName().getValue();
+
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+ if (dv == null) {
+ if (!stmtDelete.getIfExists()) {
+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
}
- } else {
+
+ //#. prepare jobs which will drop corresponding datasets with indexes.
List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
for (int j = 0; j < datasets.size(); j++) {
String datasetName = datasets.get(j).getDatasetName();
DatasetType dsType = datasets.get(j).getDatasetType();
if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
+
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
for (int k = 0; k < indexes.size(); k++) {
if (indexes.get(k).isSecondaryIndex()) {
- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
- metadataProvider);
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
+ indexes.get(k).getIndexName());
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
}
}
+
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
}
- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
}
+ //#. mark PendingDropOp on the dataverse record by
+ // first, deleting the dataverse record from the DATAVERSE_DATASET
+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
+ IMetadataEntity.PENDING_DROP_OP));
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the dataverse.
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
activeDefaultDataverse = null;
}
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- DropStatement stmtDelete = (DropStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String datasetName = stmtDelete.getDatasetName().getValue();
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null) {
- if (!stmtDelete.getIfExists())
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName + ".");
- } else {
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ DropStatement stmtDelete = (DropStatement) stmt;
+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String datasetName = stmtDelete.getDatasetName().getValue();
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (ds == null) {
+ if (!stmtDelete.getIfExists()) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName
+ + " in dataverse " + dataverseName + ".");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ }
+
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+
+ //#. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int j = 0; j < indexes.size(); j++) {
- if (indexes.get(j).isPrimaryIndex()) {
- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
- metadataProvider);
+ if (indexes.get(j).isSecondaryIndex()) {
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+ indexes.get(j).getIndexName());
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
}
}
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+
+ //#. mark the existing dataset as PendingDropOp
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+ MetadataManager.INSTANCE.addDataset(
+ mdTxnCtx,
+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ //#. run the jobs
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
+
+ //#. finally, delete the dataset.
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
- String datasetName = stmtIndexDrop.getDatasetName().getValue();
- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null)
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
- String indexName = stmtIndexDrop.getIndexName().getValue();
- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- if (idx == null) {
- if (!stmtIndexDrop.getIfExists())
- throw new AlgebricksException("There is no index with this name " + indexName + ".");
- } else
- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
- } else {
- throw new AlgebricksException(datasetName
- + " is an external dataset. Indexes are not maintained for external datasets.");
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (ds == null) {
+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+ + dataverseName);
+ }
+
+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
+ String indexName = stmtIndexDrop.getIndexName().getValue();
+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ if (index == null) {
+ if (!stmtIndexDrop.getIfExists()) {
+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
+ }
+ } else {
+ //#. prepare a job to drop the index in NC.
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+ indexName);
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+
+ //#. mark PendingDropOp on the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ MetadataManager.INSTANCE.addIndex(
+ mdTxnCtx,
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+ //#. commit the existing transaction before calling runJob.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobSpec : jobsToExecute) {
+ runJob(hcc, jobSpec);
+ }
+
+ //#. begin a new transaction
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ //#. finally, delete the existing index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ }
+ } else {
+ throw new AlgebricksException(datasetName
+ + " is an external dataset. Indexes are not maintained for external datasets.");
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+
+ } finally {
+ releaseWriteLatch();
}
}
private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
- if (dataverseName == null) {
- throw new AlgebricksException(" dataverse not specified ");
- }
- String typeName = stmtTypeDrop.getTypeName().getValue();
- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
- if (dt == null) {
- if (!stmtTypeDrop.getIfExists())
- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
- } else {
- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
+ if (dataverseName == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ String typeName = stmtTypeDrop.getTypeName().getValue();
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+ if (dt == null) {
+ if (!stmtTypeDrop.getIfExists())
+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
+ } else {
+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
- if (ng == null) {
- if (!stmtDelete.getIfExists())
- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
- } else {
- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+ if (ng == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
+ } else {
+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
- if (dataverse == null) {
- throw new AlgebricksException(" dataverse not specified ");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
+ if (dataverse == null) {
+ throw new AlgebricksException(" dataverse not specified ");
+ }
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+ if (dv == null) {
+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
+ }
+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
- if (dv == null) {
- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
- }
- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
}
private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
AlgebricksException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
- if (function == null) {
- if (!stmtDropFunction.getIfExists())
- throw new AlgebricksException("Unknonw function " + signature);
- } else {
- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
+ if (function == null) {
+ if (!stmtDropFunction.getIfExists())
+ throw new AlgebricksException("Unknonw function " + signature);
+ } else {
+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
- jobsToExecute.add(job.getJobSpec());
- // Also load the dataset's secondary indexes.
- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
- .getDatasetName().getValue());
- for (Index index : datasetIndexes) {
- if (!index.isSecondaryIndex()) {
- continue;
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+ loadStmt.dataIsAlreadySorted());
+
+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
+ jobsToExecute.add(job.getJobSpec());
+ // Also load the dataset's secondary indexes.
+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
+ .getDatasetName().getValue());
+ for (Index index : datasetIndexes) {
+ if (!index.isSecondaryIndex()) {
+ continue;
+ }
+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
+ index.getIndexType());
+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
}
- // Create CompiledCreateIndexStatement from metadata entity 'index'.
- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ for (JobSpecification jobspec : jobsToExecute) {
+ runJob(hcc, jobspec);
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- InsertStatement stmtInsert = (InsertStatement) stmt;
- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ InsertStatement stmtInsert = (InsertStatement) stmt;
+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- metadataProvider.setWriteTransaction(true);
- DeleteStatement stmtDelete = (DeleteStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
- stmtDelete.getVarCounter(), metadataProvider);
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ metadataProvider.setWriteTransaction(true);
+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
+ stmtDelete.getVarCounter(), metadataProvider);
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
+ clfrqs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
@@ -704,84 +1102,140 @@
private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
- Dataset dataset;
- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
- .getDatasetName().getValue());
- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
- }
- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
- cbfs.setQuery(bfs.getQuery());
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
- if (compiled.first != null) {
- jobsToExecute.add(compiled.first);
+ try {
+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+
+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
+
+ Dataset dataset;
+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
+ .getDatasetName().getValue());
+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
+ + " is not a feed dataset");
+ }
+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
+ cbfs.setQuery(bfs.getQuery());
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ runJob(hcc, compiled.first);
+ }
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
}
private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ runJob(hcc, jobSpec);
+
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
+ }
}
private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
List<JobSpecification> jobsToExecute) throws Exception {
- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
- if (compiled.first != null) {
- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
- jobsToExecute.add(compiled.first);
- }
- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
- }
- private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
- AqlMetadataProvider metadataProvider) throws Exception {
- // TODO: Eventually CreateIndexStatement and
- // CompiledCreateIndexStatement should be replaced by the corresponding
- // metadata entity.
- // For now we must still convert to a CompiledCreateIndexStatement here.
- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
- CompiledCreateIndexStatement createIndexStmt = new CompiledCreateIndexStatement(stmtCreateIndex.getIndexName()
- .getValue(), dataverseName, stmtCreateIndex.getDatasetName().getValue(),
- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), stmtCreateIndex.getIndexType());
- JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(createIndexStmt, metadataProvider);
- if (spec == null) {
- throw new AsterixException("Failed to create job spec for creating index '"
- + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireReadLatch();
+
+ try {
+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
+
+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ if (compiled.first != null) {
+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
+ runJob(hcc, compiled.first);
+ }
+
+ return queryResult;
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ throw new AlgebricksException(e);
+ } finally {
+ releaseReadLatch();
}
- runJob(hcc, spec);
}
private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
ACIDException {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
- if (ng != null) {
- if (!stmtCreateNodegroup.getIfNotExists())
- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
- } else {
- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
- for (Identifier id : ncIdentifiers) {
- ncNames.add(id.getValue());
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ acquireWriteLatch();
+
+ try {
+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+ if (ng != null) {
+ if (!stmtCreateNodegroup.getIfNotExists())
+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
+ } else {
+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+ for (Identifier id : ncIdentifiers) {
+ ncNames.add(id.getValue());
+ }
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
}
- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AlgebricksException(e);
+ } finally {
+ releaseWriteLatch();
}
}
@@ -789,27 +1243,6 @@
executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
}
- private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
- String indexName, AqlMetadataProvider metadataProvider) throws Exception {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
- indexName);
- }
-
- private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
- AqlMetadataProvider metadataProvider) throws Exception {
- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
- for (JobSpecification spec : jobSpecs)
- runJob(hcc, spec);
- }
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- }
-
public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
throws Exception {
for (int i = 0; i < jobs.length; i++) {
@@ -831,4 +1264,20 @@
}
return format;
}
+
+ private void acquireWriteLatch() {
+ MetadataManager.INSTANCE.acquireWriteLatch();
+ }
+
+ private void releaseWriteLatch() {
+ MetadataManager.INSTANCE.releaseWriteLatch();
+ }
+
+ private void acquireReadLatch() {
+ MetadataManager.INSTANCE.acquireReadLatch();
+ }
+
+ private void releaseReadLatch() {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index b32f129..9d20281 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -90,7 +90,7 @@
private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
ACIDException, AsterixException {
@@ -111,67 +111,10 @@
throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
}
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- return new JobSpecification[0];
+ return new JobSpecification();
}
-
- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
- dataset.getDatasetName());
- int numSecondaryIndexes = 0;
- for (Index index : datasetIndexes) {
- if (index.isSecondaryIndex()) {
- numSecondaryIndexes++;
- }
- }
- JobSpecification[] specs;
- if (numSecondaryIndexes > 0) {
- specs = new JobSpecification[numSecondaryIndexes + 1];
- int i = 0;
- // First, drop secondary indexes.
- for (Index index : datasetIndexes) {
- if (index.isSecondaryIndex()) {
- specs[i] = new JobSpecification();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
- datasetName, index.getIndexName());
- IIndexDataflowHelperFactory dfhFactory;
- switch (index.getIndexType()) {
- case BTREE:
- dfhFactory = new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
- break;
- case RTREE:
- dfhFactory = new LSMRTreeDataflowHelperFactory(
- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
- new IBinaryComparatorFactory[] { null },
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
- break;
- case NGRAM_INVIX:
- case WORD_INVIX:
- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
- break;
- default:
- throw new AsterixException("Unknown index type provided.");
- }
- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
- idxSplitsAndConstraint.second);
- i++;
- }
- }
- } else {
- specs = new JobSpecification[1];
- }
+
JobSpecification specPrimary = new JobSpecification();
- specs[specs.length - 1] = specPrimary;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
@@ -187,7 +130,7 @@
specPrimary.addRoot(primaryBtreeDrop);
- return specs;
+ return specPrimary;
}
public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 111f76b..8bb6499 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -95,11 +95,11 @@
File testFile = tcCtx.getTestFile(cUnit);
/*************** to avoid run failure cases ****************
- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
continue;
}
************************************************************/
-
+
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
+ tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 1fdde73..3989a0f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -17,6 +17,8 @@
import java.rmi.RemoteException;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
@@ -79,10 +81,10 @@
public class MetadataManager implements IMetadataManager {
// Set in init().
public static MetadataManager INSTANCE;
-
private final MetadataCache cache = new MetadataCache();
private IAsterixStateProxy proxy;
private IMetadataNode metadataNode;
+ private final ReadWriteLock metadataLatch;
public MetadataManager(IAsterixStateProxy proxy) {
if (proxy == null) {
@@ -90,6 +92,7 @@
}
this.proxy = proxy;
this.metadataNode = null;
+ this.metadataLatch = new ReentrantReadWriteLock(true);
}
@Override
@@ -206,11 +209,14 @@
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
+ // add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
+
+ // reflect the dataset into the cache
ctx.addDataset(dataset);
}
@@ -375,7 +381,7 @@
@Override
public Index getIndex(MetadataTransactionContext ctx, String dataverseName, String datasetName, String indexName)
throws MetadataException {
-
+
// First look in the context to see if this transaction created the
// requested index itself (but the index is still uncommitted).
Index index = ctx.getIndex(dataverseName, datasetName, indexName);
@@ -384,7 +390,7 @@
// uncommitted.
return index;
}
-
+
if (ctx.indexIsDropped(dataverseName, datasetName, indexName)) {
// Index has been dropped by this transaction but could still be
// in the cache.
@@ -585,4 +591,25 @@
}
return adapter;
}
+
+ @Override
+ public void acquireWriteLatch() {
+ metadataLatch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseWriteLatch() {
+ metadataLatch.writeLock().unlock();
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ metadataLatch.readLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ metadataLatch.readLock().unlock();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index a9406e2..86a8c6e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -56,14 +56,18 @@
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -75,10 +79,13 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
@@ -88,7 +95,7 @@
private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
private IIndexLifecycleManager indexLifecycleManager;
- private TransactionSubsystem transactionProvider;
+ private TransactionSubsystem transactionSubsystem;
public static final MetadataNode INSTANCE = new MetadataNode();
@@ -97,26 +104,26 @@
}
public void initialize(AsterixAppRuntimeContext runtimeContext) {
- this.transactionProvider = runtimeContext.getTransactionSubsystem();
+ this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
}
@Override
public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException {
- transactionProvider.getTransactionManager().beginTransaction(transactionId);
+ transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
}
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
@Override
public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
try {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
} catch (ACIDException e) {
e.printStackTrace();
throw e;
@@ -125,14 +132,14 @@
@Override
public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
public void unlock(JobId jobId) throws ACIDException, RemoteException {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
}
@Override
@@ -160,7 +167,7 @@
// Add the primary index for the dataset.
InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
addIndex(jobId, primaryIndex);
ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
dataset.getDatasetName());
@@ -253,22 +260,64 @@
insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple);
}
- private void insertTupleIntoIndex(JobId jobId, IMetadataIndex index, ITupleReference tuple) throws Exception {
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+ private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
+ throws Exception {
+ long resourceID = metadataIndex.getResourceID();
+ ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+
+ //prepare a Callback for logging
+ IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
+ lsmIndex, IndexOperation.INSERT);
+
+ IIndexAccessor indexAccessor = null;
+ indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
- //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
- // resourceID, IndexOperation.INSERT, tuple, null, null);
+
indexLifecycleManager.close(resourceID);
}
+ private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
+ IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ TransactionalResourceRepository txnResourceRepository = transactionSubsystem
+ .getTransactionalResourceRepository();
+
+ if (txnResourceRepository.getTransactionalResource(resourceId) == null) {
+ transactionSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId,
+ lsmIndex);
+ }
+
+ int[] primaryKeyFields = metadataIndex.getPrimaryKeyIndexes();
+ int numKeys = primaryKeyFields.length;
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories = null;
+ if (metadataIndex.isPrimaryIndex()) {
+ primaryKeyHashFunctionFactories = metadataIndex.getKeyBinaryHashFunctionFactory();
+ } else {
+ primaryKeyHashFunctionFactories = new IBinaryHashFunctionFactory[numKeys];
+ IBinaryHashFunctionFactory[] secondaryKeyHashFunctionFactories = metadataIndex
+ .getKeyBinaryHashFunctionFactory();
+ for (int i = 0; i < numKeys; i++) {
+ primaryKeyHashFunctionFactories[i] = secondaryKeyHashFunctionFactories[primaryKeyFields[i]];
+ }
+ }
+
+ IModificationOperationCallback callback = null;
+ if (metadataIndex.isPrimaryIndex()) {
+ callback = new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+ primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, transactionSubsystem.getLockManager(),
+ transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+ } else {
+ callback = new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+ primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, transactionSubsystem.getLockManager(),
+ transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+ }
+
+ return callback;
+ }
+
@Override
public void dropDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
try {
@@ -524,24 +573,16 @@
}
}
- private void deleteTupleFromIndex(JobId jobId, IMetadataIndex index, ITupleReference tuple) throws Exception {
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+ private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
+ throws Exception {
+ long resourceID = metadataIndex.getResourceID();
+ ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- // This lock is actually an upgrade, because a deletion must be preceded
- // by a search, in order to be able to undo an aborted deletion.
- // The transaction with txnId will have an S lock on the
- // resource. Note that lock converters have a higher priority than
- // regular waiters in the LockManager.
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
+ //prepare a Callback for logging
+ IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
+ lsmIndex, IndexOperation.DELETE);
+ IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
indexAccessor.delete(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
- //check how to get the oldValue.
- //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
- // resourceID, IndexOperation.DELETE, tuple, operation, null);
indexLifecycleManager.close(resourceID);
}
@@ -802,8 +843,10 @@
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ //#. currently lock is not needed to access any metadata
+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
long resourceID = index.getResourceID();
IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index 8052f75..5ec1d5a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -19,6 +19,7 @@
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -104,19 +105,19 @@
}
public void dropDataset(String dataverseName, String datasetName) {
- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
droppedCache.addDatasetIfNotExists(dataset);
logAndApply(new MetadataLogicalOperation(dataset, false));
}
public void dropIndex(String dataverseName, String datasetName, String indexName) {
- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
droppedCache.addIndexIfNotExists(index);
logAndApply(new MetadataLogicalOperation(index, false));
}
public void dropDataverse(String dataverseName) {
- Dataverse dataverse = new Dataverse(dataverseName, null);
+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
droppedCache.addDataverseIfNotExists(dataverse);
logAndApply(new MetadataLogicalOperation(dataverse, false));
}
@@ -162,7 +163,7 @@
}
return droppedCache.getDataset(dataverseName, datasetName) != null;
}
-
+
public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
if (droppedCache.getDataverse(dataverseName) != null) {
return true;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
index 847ec67..43f8dc4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
@@ -20,6 +20,11 @@
import edu.uci.ics.asterix.metadata.MetadataCache;
public interface IMetadataEntity extends Serializable {
+
+ public static final int PENDING_NO_OP = 0;
+ public static final int PENDING_ADD_OP = 1;
+ public static final int PENDING_DROP_OP = 2;
+
Object addToCache(MetadataCache cache);
Object dropFromCache(MetadataCache cache);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index 8554ddd..d9021a5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -75,6 +75,11 @@
public long getResourceID();
public IndexLogger getIndexLogger();
-
+
public DatasetId getDatasetId();
+
+ boolean isPrimaryIndex();
+
+ int[] getPrimaryKeyIndexes();
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index fe96a6f..ad08f18 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -433,4 +433,12 @@
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
+
+ public void acquireWriteLatch();
+
+ public void releaseWriteLatch();
+
+ public void acquireReadLatch();
+
+ public void releaseReadLatch();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 838de70..0570c82 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -31,6 +31,7 @@
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -226,7 +227,7 @@
public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
}
public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
@@ -236,7 +237,7 @@
primaryIndexes[i].getNodeGroupName());
MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
}
}
@@ -267,7 +268,7 @@
for (int i = 0; i < secondaryIndexes.length; i++) {
MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
- secondaryIndexes[i].getPartitioningExpr(), false));
+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index c069d0d..de0061d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -73,9 +73,14 @@
private IndexLogger indexLogger;
// datasetId
private final DatasetId datasetId;
+ // Flag of primary index
+ protected final boolean isPrimaryIndex;
+ // PrimaryKeyField indexes used for secondary index operations
+ protected final int[] primaryKeyIndexes;
public MetadataIndex(String datasetName, String indexName, int numFields, IAType[] keyTypes, String[] keyNames,
- ARecordType payloadType, int datasetId) throws AsterixRuntimeException {
+ ARecordType payloadType, int datasetId, boolean isPrimaryIndex, int[] primaryKeyIndexes)
+ throws AsterixRuntimeException {
// Sanity checks.
if (keyTypes.length != keyNames.length) {
throw new AsterixRuntimeException("Unequal number of key types and names given.");
@@ -130,6 +135,10 @@
}
this.datasetId = new DatasetId(datasetId);
+ this.isPrimaryIndex = isPrimaryIndex;
+
+ //PrimaryKeyFieldIndexes
+ this.primaryKeyIndexes = primaryKeyIndexes;
}
@Override
@@ -249,4 +258,14 @@
public DatasetId getDatasetId() {
return datasetId;
}
+
+ @Override
+ public boolean isPrimaryIndex() {
+ return isPrimaryIndex;
+ }
+
+ @Override
+ public int[] getPrimaryKeyIndexes() {
+ return primaryKeyIndexes;
+ }
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 4061fed..483881c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -61,32 +61,37 @@
}
DATAVERSE_DATASET = new MetadataIndex("Dataverse", null, 2, new IAType[] { BuiltinType.ASTRING },
- new String[] { "DataverseName" }, MetadataRecordTypes.DATAVERSE_RECORDTYPE, DATAVERSE_DATASET_ID);
+ new String[] { "DataverseName" }, MetadataRecordTypes.DATAVERSE_RECORDTYPE, DATAVERSE_DATASET_ID, true,
+ new int[] { 0 });
DATASET_DATASET = new MetadataIndex("Dataset", null, 3,
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
- "DatasetName" }, MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID);
+ "DatasetName" }, MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID, true, new int[] {
+ 0, 1 });
DATATYPE_DATASET = new MetadataIndex("Datatype", null, 3, new IAType[] { BuiltinType.ASTRING,
BuiltinType.ASTRING }, new String[] { "DataverseName", "DatatypeName" },
- MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID);
+ MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID, true, new int[] { 0, 1 });
INDEX_DATASET = new MetadataIndex("Index", null, 4, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
BuiltinType.ASTRING }, new String[] { "DataverseName", "DatasetName", "IndexName" },
- MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID);
+ MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID, true, new int[] { 0, 1, 2 });
NODE_DATASET = new MetadataIndex("Node", null, 2, new IAType[] { BuiltinType.ASTRING },
- new String[] { "NodeName" }, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID);
+ new String[] { "NodeName" }, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID, true,
+ new int[] { 0 });
NODEGROUP_DATASET = new MetadataIndex("Nodegroup", null, 2, new IAType[] { BuiltinType.ASTRING },
- new String[] { "GroupName" }, MetadataRecordTypes.NODEGROUP_RECORDTYPE, NODEGROUP_DATASET_ID);
+ new String[] { "GroupName" }, MetadataRecordTypes.NODEGROUP_RECORDTYPE, NODEGROUP_DATASET_ID, true,
+ new int[] { 0 });
FUNCTION_DATASET = new MetadataIndex("Function", null, 4, new IAType[] { BuiltinType.ASTRING,
BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name", "Arity" },
- MetadataRecordTypes.FUNCTION_RECORDTYPE, FUNCTION_DATASET_ID);
+ MetadataRecordTypes.FUNCTION_RECORDTYPE, FUNCTION_DATASET_ID, true, new int[] { 0, 1, 2 });
DATASOURCE_ADAPTER_DATASET = new MetadataIndex("DatasourceAdapter", null, 3, new IAType[] {
BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" },
- MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID);
+ MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
+ 1 });
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 50ea20a..17924b7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -80,10 +80,11 @@
public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
private static final ARecordType createDataverseRecordType() {
- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
}
// Helper constants for accessing fields in an ARecord of anonymous type
@@ -158,10 +159,11 @@
public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
private static final ARecordType createDatasetRecordType() {
String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
List<IAType> internalRecordUnionList = new ArrayList<IAType>();
internalRecordUnionList.add(BuiltinType.ANULL);
@@ -179,7 +181,8 @@
AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
+ BuiltinType.AINT32 };
return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
}
@@ -264,13 +267,14 @@
public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
private static final ARecordType createIndexRecordType() {
AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
- "IsPrimary", "Timestamp" };
+ "IsPrimary", "Timestamp", "PendingOp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
};
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
index f696714..39b6a7f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
@@ -44,14 +44,17 @@
GROUPNAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "GroupName", 3, new IAType[] { BuiltinType.ASTRING,
BuiltinType.ASTRING, BuiltinType.ASTRING },
- new String[] { "GroupName", "DataverseName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID);
+ new String[] { "GroupName", "DataverseName", "DatasetName" }, null,
+ MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 1, 2 });
DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "DatatypeName", 3, new IAType[] {
BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
- "DatatypeName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID);
+ "DatatypeName", "DatasetName" }, null, MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 0,
+ 2 });
DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex("Datatype", "DatatypeName", 3, new IAType[] {
BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName",
- "NestedDatatypeName", "TopDatatypeName" }, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID);
+ "NestedDatatypeName", "TopDatatypeName" }, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID, false,
+ new int[] { 0, 2 });
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index bdd9dc5..3c96825 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -129,7 +129,7 @@
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
- private final MetadataTransactionContext mdTxnCtx;
+ private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
private Map<String, String[]> stores;
private Map<String, String> config;
@@ -156,8 +156,7 @@
return config;
}
- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
- this.mdTxnCtx = mdTxnCtx;
+ public AqlMetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
this.stores = AsterixProperties.INSTANCE.getStores();
}
@@ -181,6 +180,10 @@
public void setWriterFactory(IAWriterFactory writerFactory) {
this.writerFactory = writerFactory;
}
+
+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
+ this.mdTxnCtx = mdTxnCtx;
+ }
public MetadataTransactionContext getMetadataTxnContext() {
return mdTxnCtx;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index 2adf4fc..e710c90 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -35,15 +35,18 @@
private final DatasetType datasetType;
private IDatasetDetails datasetDetails;
private final int datasetId;
+ // Type of pending operations with respect to atomic DDL operation
+ private final int pendingOp;
public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
- DatasetType datasetType, int datasetId) {
+ DatasetType datasetType, int datasetId, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.itemTypeName = itemTypeName;
this.datasetType = datasetType;
this.datasetDetails = datasetDetails;
this.datasetId = datasetId;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -73,6 +76,10 @@
public int getDatasetId() {
return datasetId;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
@Override
public Object addToCache(MetadataCache cache) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
index bacd812..3902282 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
@@ -27,10 +27,12 @@
// Enforced to be unique within an Asterix cluster..
private final String dataverseName;
private final String dataFormat;
+ private final int pendingOp;
- public Dataverse(String dataverseName, String format) {
+ public Dataverse(String dataverseName, String format, int pendingOp) {
this.dataverseName = dataverseName;
this.dataFormat = format;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -40,6 +42,10 @@
public String getDataFormat() {
return dataFormat;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
@Override
public Object addToCache(MetadataCache cache) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
index 39f4689..7c23c65 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
@@ -45,9 +45,11 @@
private final boolean isPrimaryIndex;
// Specific to NGRAM indexes.
private final int gramLength;
+ // Type of pending operations with respect to atomic DDL operation
+ private final int pendingOp;
public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.indexName = indexName;
@@ -55,10 +57,11 @@
this.keyFieldNames = keyFieldNames;
this.gramLength = gramLength;
this.isPrimaryIndex = isPrimaryIndex;
+ this.pendingOp = pendingOp;
}
public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
- List<String> keyFieldNames, boolean isPrimaryIndex) {
+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.indexName = indexName;
@@ -66,6 +69,7 @@
this.keyFieldNames = keyFieldNames;
this.gramLength = -1;
this.isPrimaryIndex = isPrimaryIndex;
+ this.pendingOp = pendingOp;
}
public String getDataverseName() {
@@ -95,6 +99,10 @@
public boolean isPrimaryIndex() {
return isPrimaryIndex;
}
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
public boolean isSecondaryIndex() {
return !isPrimaryIndex();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 0cbd392..7af5aae 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -77,9 +77,9 @@
protected ISerializerDeserializer<AInt32> aInt32Serde;
@SuppressWarnings("unchecked")
- public DatasetTupleTranslator(boolean getTuple) {
+ public DatasetTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
- aInt32 = new AMutableInt32(-1);
+ aInt32 = new AMutableInt32(-1);
aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@@ -104,8 +104,10 @@
.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
IDatasetDetails datasetDetails = null;
- int datasetId = ((AInt32) datasetRecord
+ int datasetId = ((AInt32) datasetRecord
.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
+ int pendingOp = ((AInt32) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
switch (datasetType) {
case FEED:
case INTERNAL: {
@@ -197,7 +199,7 @@
}
datasetDetails = new ExternalDatasetDetails(adapter, properties);
}
- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
}
@Override
@@ -248,13 +250,19 @@
aString.setValue(Calendar.getInstance().getTime().toString());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
+
// write field 8
fieldValue.reset();
aInt32.setValue(dataset.getDatasetId());
aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
-
+
+ // write field 9
+ fieldValue.reset();
+ aInt32.setValue(dataset.getPendingOp());
+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
@@ -290,13 +298,15 @@
fieldValue.reset();
aString.setValue(name);
stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
+ fieldValue);
// write field 1
fieldValue.reset();
aString.setValue(value);
stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
+ fieldValue);
propertyRecordBuilder.write(out, true);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
index 271bf90..f9eaf5a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
@@ -25,8 +25,11 @@
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
import edu.uci.ics.asterix.om.base.ARecord;
import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,12 +43,18 @@
// Payload field containing serialized Dataverse.
public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
+ private AMutableInt32 aInt32;
+ protected ISerializerDeserializer<AInt32> aInt32Serde;
+
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
+ @SuppressWarnings("unchecked")
public DataverseTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
+ aInt32 = new AMutableInt32(-1);
+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@Override
@@ -57,7 +66,8 @@
DataInput in = new DataInputStream(stream);
ARecord dataverseRecord = recordSerDes.deserialize(in);
return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
}
@Override
@@ -88,6 +98,12 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ // write field 3
+ fieldValue.reset();
+ aInt32.setValue(instance.getPendingOp());
+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
index d71480f..c75b7b2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
@@ -96,13 +96,15 @@
}
Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
.getBoolean();
+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
+ .getIntegerValue();
// Check if there is a gram length as well.
int gramLength = -1;
int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
if (gramLenPos >= 0) {
gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
}
- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
}
@Override
@@ -174,7 +176,12 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
- // write optional field 7
+ // write field 7
+ fieldValue.reset();
+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
+
+ // write optional field 8
if (instance.getGramLength() > 0) {
fieldValue.reset();
nameValue.reset();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
index 1eeadff..d69a36e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -167,7 +167,7 @@
if (tupleSize != 0) {
//old tuple
- tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer()
+ tupleWriter.writeTuple(reusableLogContentObject.getOldValue(), logicalLogLocator.getBuffer()
.getArray(), logicalLogLocator.getMemoryOffset() + offset);
}
}