Ensure Metadata locks are acquired for SQL++ queries
Change-Id: I5f468599897a37cbcb12d8577d072f340f0d949c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1642
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c6ea045..5eae36b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -117,8 +117,7 @@
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
Dataset dataset;
try {
- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
- jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
@@ -257,8 +256,7 @@
jobSpec, queryField, appContext.getStorageManager(), secondarySplitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListsComparatorFactories, dataflowHelperFactory, queryTokenizerFactory,
- searchModifierFactory, outputRecDesc, retainInput, retainMissing,
- context.getMissingWriterFactory(),
+ searchModifierFactory, outputRecDesc, retainInput, retainMissing, context.getMissingWriterFactory(),
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
IndexOperation.SEARCH, null),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index ebd1bdb..869289a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -90,44 +90,46 @@
// Metadata transaction begins.
MetadataManager.INSTANCE.init();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-
// Retrieves file splits of the dataset.
MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- jsonResponse.put("error",
- "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
+ try {
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ jsonResponse.put("error",
+ "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
+ out.write(jsonResponse.toString());
+ out.flush();
+ return;
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ FileSplit[] fileSplits =
+ metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp);
+ ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+ List<List<String>> primaryKeys = DatasetUtil.getPartitioningKeys(dataset);
+ StringBuilder pkStrBuf = new StringBuilder();
+ for (List<String> keys : primaryKeys) {
+ for (String key : keys) {
+ pkStrBuf.append(key).append(",");
+ }
+ }
+ pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
+ // Constructs the returned json object.
+ formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
+ hcc.getNodeControllerInfos());
+
+ // Flush the cached contents of the dataset to file system.
+ FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
+
+ // Metadata transaction commits.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ // Writes file splits.
out.write(jsonResponse.toString());
out.flush();
- return;
+ } finally {
+ metadataProvider.getLocks().unlock();
}
- boolean temp = dataset.getDatasetDetails().isTemp();
- FileSplit[] fileSplits =
- metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp);
- ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
- dataset.getItemTypeName());
- List<List<String>> primaryKeys = DatasetUtil.getPartitioningKeys(dataset);
- StringBuilder pkStrBuf = new StringBuilder();
- for (List<String> keys : primaryKeys) {
- for (String key : keys) {
- pkStrBuf.append(key).append(",");
- }
- }
- pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
-
- // Constructs the returned json object.
- formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
- hcc.getNodeControllerInfos());
-
- // Flush the cached contents of the dataset to file system.
- FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
-
- // Metadata transaction commits.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- // Writes file splits.
- out.write(jsonResponse.toString());
- out.flush();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failure handling a request", e);
out.println(e.getMessage());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 4ed8510..59b3e88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -135,13 +135,13 @@
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.metadata.utils.MetadataLockManager;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.TypeUtil;
import org.apache.asterix.om.types.ARecordType;
@@ -411,7 +411,7 @@
String dvName = dvd.getDataverseName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
if (dv == null) {
@@ -423,7 +423,7 @@
abort(e, e, mdTxnCtx);
throw new MetadataException(e);
} finally {
- MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -434,7 +434,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
if (dv != null) {
@@ -452,7 +452,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -509,18 +509,15 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, itemTypeDataverseName,
- itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
+ MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(), dataverseName,
+ itemTypeDataverseName, itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
dataverseName + "." + datasetName, defaultCompactionPolicy);
Dataset dataset = null;
Index primaryIndex = null;
try {
-
IDatasetDetails datasetDetails = null;
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
+ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds != null) {
if (dd.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -671,13 +668,9 @@
+ "." + datasetName + ") couldn't be removed from the metadata", e);
}
}
-
throw e;
} finally {
- MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, itemTypeDataverseName,
- itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
- metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
- dataverseName + "." + datasetName, defaultCompactionPolicy);
+ metadataProvider.getLocks().unlock();
}
}
@@ -770,9 +763,8 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
+ MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
String indexName = null;
JobSpecification spec = null;
Dataset ds = null;
@@ -784,8 +776,7 @@
boolean datasetLocked = false;
Index index = null;
try {
- ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
+ ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
throw new AlgebricksException(
"There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
@@ -1105,7 +1096,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
+ metadataProvider.getLocks().unlock();
if (datasetLocked) {
ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
}
@@ -1118,7 +1109,8 @@
String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
+ MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + typeName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1145,7 +1137,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1162,8 +1154,8 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
List<JobSpecification> jobsToExecute = new ArrayList<>();
+ MetadataLockManager.INSTANCE.acquireDataverseWriteLock(metadataProvider.getLocks(), dataverseName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1177,12 +1169,16 @@
// # disconnect all feeds from any datasets in the dataverse.
IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
Identifier dvId = new Identifier(dataverseName);
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
for (IActiveEntityEventsListener listener : activeListeners) {
EntityId activeEntityId = listener.getEntityId();
if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
&& activeEntityId.getDataverse().equals(dataverseName)) {
+ tempMdProvider.getLocks().reset();
stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())),
- metadataProvider);
+ tempMdProvider);
// prepare job to remove feed log storage
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
@@ -1199,8 +1195,8 @@
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int k = 0; k < indexes.size(); k++) {
if (indexes.get(k).isSecondaryIndex()) {
- jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider,
- datasets.get(j)));
+ jobsToExecute.add(
+ IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j)));
}
}
Index primaryIndex =
@@ -1215,8 +1211,8 @@
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
datasets.get(j)));
} else {
- jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider,
- datasets.get(j)));
+ jobsToExecute.add(
+ IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j)));
}
}
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
@@ -1285,7 +1281,7 @@
throw e;
} finally {
- MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
+ metadataProvider.getLocks().unlock();
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1309,7 +1305,13 @@
DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
- doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+ MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
+ try {
+ doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
}
public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
@@ -1319,10 +1321,9 @@
new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
MutableBoolean bActiveTxn = new MutableBoolean(true);
metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
- MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
List<JobSpecification> jobsToExecute = new ArrayList<>();
try {
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
if (ifExists) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
@@ -1367,7 +1368,6 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1382,16 +1382,14 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
+ List<JobSpecification> jobsToExecute = new ArrayList<>();
+ MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
String indexName = null;
// For external index
boolean dropFilesIndex = false;
- List<JobSpecification> jobsToExecute = new ArrayList<>();
try {
-
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
throw new AlgebricksException(
"There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
@@ -1558,7 +1556,7 @@
throw e;
} finally {
- MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, dataverseName + "." + datasetName);
+ metadataProvider.getLocks().unlock();
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1571,8 +1569,8 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.dropTypeBegin(dataverseName, dataverseName + "." + typeName);
-
+ MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + typeName);
try {
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
if (dt == null) {
@@ -1587,7 +1585,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.dropTypeEnd(dataverseName, dataverseName + "." + typeName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1596,7 +1594,7 @@
String nodegroupName = stmtDelete.getNodeGroupName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(nodegroupName);
+ MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodegroupName);
try {
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
if (ng == null) {
@@ -1612,7 +1610,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(nodegroupName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1624,9 +1622,9 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.functionStatementBegin(dataverse, dataverse + "." + functionName);
+ MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), dataverse,
+ dataverse + "." + functionName);
try {
-
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
@@ -1641,7 +1639,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.functionStatementEnd(dataverse, dataverse + "." + functionName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1651,7 +1649,7 @@
signature.setNamespace(getActiveDataverseName(signature.getNamespace()));
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(),
+ MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), signature.getNamespace(),
signature.getNamespace() + "." + signature.getName());
try {
Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
@@ -1667,8 +1665,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(),
- signature.getNamespace() + "." + signature.getName());
+ metadataProvider.getLocks().unlock();
}
}
@@ -1680,7 +1677,8 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
try {
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
@@ -1697,7 +1695,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.modifyDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1707,21 +1705,16 @@
throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
- Query query = stmtInsertUpsert.getQuery();
-
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
- MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
- dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
- query.getDatasets());
+ MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+ dataverseName + "." + stmtInsertUpsert.getDatasetName());
}
@Override
public void unlock() {
- MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
- dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
- query.getDatasets());
+ metadataProvider.getLocks().unlock();
}
};
final IStatementCompiler compiler = () -> {
@@ -1764,16 +1757,13 @@
public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, boolean compileOnly) throws Exception {
-
DeleteStatement stmtDelete = (DeleteStatement) stmt;
String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
- dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
- stmtDelete.getDatasets());
-
+ MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+ dataverseName + "." + stmtDelete.getDatasetName());
try {
metadataProvider.setWriteTransaction(true);
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
@@ -1788,16 +1778,13 @@
JobUtils.runJob(hcc, jobSpec, true);
}
return jobSpec;
-
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
- dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
- stmtDelete.getDatasets());
+ metadataProvider.getLocks().unlock();
}
}
@@ -1852,7 +1839,8 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
+ MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + feedName);
Feed feed = null;
try {
feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -1873,7 +1861,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1886,7 +1874,8 @@
CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
dataverse = getActiveDataverse(null);
policy = cfps.getPolicyName();
- MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
+ MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(), dataverse,
+ dataverse + "." + policy);
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1935,7 +1924,7 @@
abort(e, e, mdTxnCtx);
throw new HyracksDataException(e);
} finally {
- MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1946,8 +1935,8 @@
String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.dropFeedBegin(dataverseName, dataverseName + "." + feedName);
-
+ MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + feedName);
try {
Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
if (feed == null) {
@@ -1980,7 +1969,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -1990,8 +1979,8 @@
FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt;
String dataverseName = getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
- MetadataLockManager.INSTANCE.dropFeedPolicyBegin(dataverseName, dataverseName + "." + policyName);
-
+ MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + policyName);
try {
FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
if (feedPolicy == null) {
@@ -2007,7 +1996,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.dropFeedPolicyEnd(dataverseName, dataverseName + "." + policyName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2035,15 +2024,15 @@
throw new AlgebricksException("Feed " + feedName + " is started already.");
}
// Start
+ MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + feedName, feedConnections);
try {
- MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, feedConnections);
// Prepare policy
List<IDataset> datasets = new ArrayList<>();
for (FeedConnection connection : feedConnections) {
- datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, connection.getDataverseName(),
- connection.getDatasetName()));
+ Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName());
+ datasets.add(ds);
}
-
org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections,
compilationProvider, storageComponentProvider, qtFactory, hcc);
@@ -2064,7 +2053,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.startFeedEnd(dataverseName, dataverseName + "." + feedName, feedConnections);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2083,7 +2072,7 @@
// Transaction
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.StopFeedBegin(dataverseName, feedName);
+ MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), dataverseName, feedName);
try {
// validate
FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx);
@@ -2097,7 +2086,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.StopFeedEnd(dataverseName, feedName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2110,17 +2099,17 @@
String policyName = cfs.getPolicy();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- // validation
- FeedMetadataUtil.validateIfDatasetExists(dataverseName, datasetName, mdTxnCtx);
- Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
- metadataProvider.getMetadataTxnContext());
- ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(),
- ExternalDataConstants.KEY_TYPE_NAME);
- List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions();
// Transaction handling
- MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + feedName);
+ MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName, dataverseName + "." + feedName);
try {
+ // validation
+ FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName, mdTxnCtx);
+ Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+ metadataProvider.getMetadataTxnContext());
+ ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(),
+ ExternalDataConstants.KEY_TYPE_NAME);
+ List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions();
fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
feedName, datasetName);
if (fc != null) {
@@ -2134,8 +2123,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + feedName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2146,10 +2134,11 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + cfs.getFeedName());
+ MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName, dataverseName + "." + cfs.getFeedName());
try {
- FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+ FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue(),
+ mdTxnCtx);
FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName, datasetName);
@@ -2162,8 +2151,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + cfs.getFeedName());
+ metadataProvider.getLocks().unlock();
}
}
@@ -2175,10 +2163,11 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
List<JobSpecification> jobsToExecute = new ArrayList<>();
+ MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
try {
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
throw new AlgebricksException(
"There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
@@ -2230,7 +2219,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.compactEnd(dataverseName, dataverseName + "." + datasetName);
+ metadataProvider.getLocks().unlock();
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -2270,12 +2259,11 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
- MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
}
@Override
public void unlock() {
- MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+ metadataProvider.getLocks().unlock();
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
@@ -2389,11 +2377,11 @@
printer.print(jobId);
}
} finally {
+ locker.unlock();
// No matter the job succeeds or fails, removes it into the context.
if (ctx != null && clientContextId != null) {
ctx.removeJobIdFromClientContextId(clientContextId);
}
- locker.unlock();
}
}
@@ -2404,8 +2392,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(ngName);
-
+ MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), ngName);
try {
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
if (ng != null) {
@@ -2425,7 +2412,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(ngName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2436,7 +2423,6 @@
String datasetName = stmtRefresh.getDatasetName().getValue();
TransactionState transactionState = TransactionState.COMMIT;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, dataverseName + "." + datasetName);
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification spec = null;
@@ -2449,10 +2435,10 @@
Dataset transactionDataset = null;
boolean lockAquired = false;
boolean success = false;
+ MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(), dataverseName,
+ dataverseName + "." + datasetName);
try {
- ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
-
+ ds = metadataProvider.findDataset(dataverseName, datasetName);
// Dataset exists ?
if (ds == null) {
throw new AlgebricksException(
@@ -2649,7 +2635,7 @@
if (lockAquired) {
ExternalDatasetsRegistry.INSTANCE.refreshEnd(ds, success);
}
- MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+ metadataProvider.getLocks().unlock();
}
}
@@ -2676,15 +2662,11 @@
String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
-
+ String fullyQualifiedDatasetNameTo =
+ DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo : dataverseNameTo + '.' + datasetNameTo;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- List<String> readDataverses = new ArrayList<>();
- readDataverses.add(dataverseNameFrom);
- List<String> readDatasets = new ArrayList<>();
- readDatasets.add(datasetNameFrom);
- MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseNameTo, datasetNameTo, readDataverses,
- readDatasets);
+ MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(), fullyQualifiedDatasetNameTo);
try {
prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, dataverseNameFrom, dataverseNameTo,
datasetNameFrom, datasetNameTo, mdTxnCtx);
@@ -2729,8 +2711,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseNameTo, datasetNameTo, readDataverses,
- readDatasets);
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 002e270..7bd1e62 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -111,93 +111,98 @@
for (Dataverse dataverse : dataverses) {
if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
MetadataProvider metadataProvider = new MetadataProvider(dataverse, componentProvider);
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
- dataverse.getDataverseName());
- for (Dataset dataset : datasets) {
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- // External dataset
- // Get indexes
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
- dataset.getDataverseName(), dataset.getDatasetName());
- if (!indexes.isEmpty()) {
- // Get the state of the dataset
- ExternalDatasetDetails dsd =
- (ExternalDatasetDetails) dataset.getDatasetDetails();
- TransactionState datasetState = dsd.getState();
- if (datasetState == TransactionState.BEGIN) {
- List<ExternalFile> files = MetadataManager.INSTANCE
- .getDatasetExternalFiles(mdTxnCtx, dataset);
- // if persumed abort, roll backward
- // 1. delete all pending files
- for (ExternalFile file : files) {
- if (file.getPendingOp() != ExternalFilePendingOp.NO_OP) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
- }
- }
- // 2. clean artifacts in NCs
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations
- .buildAbortOp(dataset, indexes, metadataProvider);
- executeHyracksJob(jobSpec);
- // 3. correct the dataset state
- ((ExternalDatasetDetails) dataset.getDatasetDetails())
- .setState(TransactionState.COMMIT);
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- } else if (datasetState == TransactionState.READY_TO_COMMIT) {
- List<ExternalFile> files = MetadataManager.INSTANCE
- .getDatasetExternalFiles(mdTxnCtx, dataset);
- // if ready to commit, roll forward
- // 1. commit indexes in NCs
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations
- .buildRecoverOp(dataset, indexes, metadataProvider);
- executeHyracksJob(jobSpec);
- // 2. add pending files in metadata
- for (ExternalFile file : files) {
- if (file.getPendingOp() == ExternalFilePendingOp.ADD_OP) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
- file.setPendingOp(ExternalFilePendingOp.NO_OP);
- MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
- } else if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
- // find original file
- for (ExternalFile originalFile : files) {
- if (originalFile.getFileName()
- .equals(file.getFileName())) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- file);
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- originalFile);
- break;
- }
+ try {
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+ dataverse.getDataverseName());
+ for (Dataset dataset : datasets) {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ // External dataset
+ // Get indexes
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+ dataset.getDataverseName(), dataset.getDatasetName());
+ if (!indexes.isEmpty()) {
+ // Get the state of the dataset
+ ExternalDatasetDetails dsd =
+ (ExternalDatasetDetails) dataset.getDatasetDetails();
+ TransactionState datasetState = dsd.getState();
+ if (datasetState == TransactionState.BEGIN) {
+ List<ExternalFile> files = MetadataManager.INSTANCE
+ .getDatasetExternalFiles(mdTxnCtx, dataset);
+ // if persumed abort, roll backward
+ // 1. delete all pending files
+ for (ExternalFile file : files) {
+ if (file.getPendingOp() != ExternalFilePendingOp.NO_OP) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
}
- } else if (file
- .getPendingOp() == ExternalFilePendingOp.APPEND_OP) {
- // find original file
- for (ExternalFile originalFile : files) {
- if (originalFile.getFileName()
- .equals(file.getFileName())) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- file);
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- originalFile);
- originalFile.setSize(file.getSize());
- MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
- originalFile);
+ }
+ // 2. clean artifacts in NCs
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildAbortOp(dataset, indexes, metadataProvider);
+ executeHyracksJob(jobSpec);
+ // 3. correct the dataset state
+ ((ExternalDatasetDetails) dataset.getDatasetDetails())
+ .setState(TransactionState.COMMIT);
+ MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ } else if (datasetState == TransactionState.READY_TO_COMMIT) {
+ List<ExternalFile> files = MetadataManager.INSTANCE
+ .getDatasetExternalFiles(mdTxnCtx, dataset);
+ // if ready to commit, roll forward
+ // 1. commit indexes in NCs
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildRecoverOp(dataset, indexes, metadataProvider);
+ executeHyracksJob(jobSpec);
+ // 2. add pending files in metadata
+ for (ExternalFile file : files) {
+ if (file.getPendingOp() == ExternalFilePendingOp.ADD_OP) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+ file.setPendingOp(ExternalFilePendingOp.NO_OP);
+ MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+ // find original file
+ for (ExternalFile originalFile : files) {
+ if (originalFile.getFileName()
+ .equals(file.getFileName())) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ file);
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ originalFile);
+ break;
+ }
+ }
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.APPEND_OP) {
+ // find original file
+ for (ExternalFile originalFile : files) {
+ if (originalFile.getFileName()
+ .equals(file.getFileName())) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ file);
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ originalFile);
+ originalFile.setSize(file.getSize());
+ MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+ originalFile);
+ }
}
}
}
+ // 3. correct the dataset state
+ ((ExternalDatasetDetails) dataset.getDatasetDetails())
+ .setState(TransactionState.COMMIT);
+ MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
}
- // 3. correct the dataset state
- ((ExternalDatasetDetails) dataset.getDatasetDetails())
- .setState(TransactionState.COMMIT);
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
}
}
}
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 7da3b32..fe08b8c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -169,12 +169,16 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
// Retrieves file splits of the dataset.
MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
- ARecordType recordType =
- (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
- // Metadata transaction commits.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return recordType;
+ try {
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+ // Metadata transaction commits.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return recordType;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 53f5f62..4b2af80 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -162,11 +162,10 @@
}
public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
- Dataset dataset, IAType[] primaryKeyTypes,
- ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
- Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider)
- throws AlgebricksException, HyracksDataException {
+ Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
+ int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+ StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
storageComponentProvider);
@@ -301,8 +300,13 @@
MetadataUtil.PENDING_NO_OP);
Index index = primaryIndexInfo.getIndex();
MetadataProvider mdProvider = new MetadataProvider(dataverse, storageComponentProvider);
- return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
- primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
+ try {
+ return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
+ primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
+ primaryIndexInfo.mergePolicyProperties);
+ } finally {
+ mdProvider.getLocks().unlock();
+ }
}
public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index f3bebbe..1d8ae5e 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -41,10 +41,8 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
/**
* Represents the AQL statement for subscribing to a feed.
@@ -70,9 +68,9 @@
public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
this.query = new Query(false);
EntityId sourceFeedId = connectionRequest.getReceivingFeedId();
- Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
- connectionRequest.getReceivingFeedId().getDataverse(),
- connectionRequest.getReceivingFeedId().getEntityName());
+ Feed subscriberFeed =
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
+ connectionRequest.getReceivingFeedId().getEntityName());
if (subscriberFeed == null) {
throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
}
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
index 9268422..57370c5 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
@@ -41,17 +41,17 @@
@Override
public Void visit(DeleteStatement deleteStmt, Void visitArg) {
- List<Expression> arguments = new ArrayList<Expression>();
+ List<Expression> arguments = new ArrayList<>();
Identifier dataverseName = deleteStmt.getDataverseName();
Identifier datasetName = deleteStmt.getDatasetName();
String arg = dataverseName == null ? datasetName.getValue()
: dataverseName.getValue() + "." + datasetName.getValue();
LiteralExpr argumentLiteral = new LiteralExpr(new StringLiteral(arg));
arguments.add(argumentLiteral);
- CallExpr callExpression = new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1),
- arguments);
+ CallExpr callExpression =
+ new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), arguments);
- List<Clause> clauseList = new ArrayList<Clause>();
+ List<Clause> clauseList = new ArrayList<>();
VariableExpr var = deleteStmt.getVariableExpr();
Clause forClause = new ForClause(var, callExpression);
clauseList.add(forClause);
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index ba699f0..b2909c4 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1015,10 +1015,6 @@
VariableExpr var = null;
Expression condition = null;
Pair<Identifier, Identifier> nameComponents;
- // This is related to the new metadata lock management
- setDataverses(new ArrayList<String>());
- setDatasets(new ArrayList<String>());
-
}
{
<DELETE> var = Variable()
@@ -1029,13 +1025,8 @@
(<WHERE> condition = Expression())?
{
// First we get the dataverses and datasets that we want to lock
- List<String> dataverses = getDataverses();
- List<String> datasets = getDatasets();
- // we remove the pointer to the dataverses and datasets
- setDataverses(null);
- setDatasets(null);
return new DeleteStatement(var, nameComponents.first, nameComponents.second,
- condition, getVarCounter(), dataverses, datasets);
+ condition, getVarCounter());
}
}
@@ -1621,9 +1612,6 @@
Query Query() throws ParseException:
{
Query query = new Query(false);
- // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
- setDataverses(query.getDataverses());
- setDatasets(query.getDatasets());
Expression expr;
}
{
@@ -1631,9 +1619,6 @@
{
query.setBody(expr);
query.setVarCounter(getVarCounter());
- // we remove the pointers to the locked entities before we return the query object
- setDataverses(null);
- setDatasets(null);
return query;
}
}
@@ -2282,12 +2267,6 @@
LiteralExpr ds = new LiteralExpr();
ds.setValue( new StringLiteral(name) );
nameArg = ds;
- if(arg2 != null){
- addDataverse(arg1.toString());
- addDataset(name);
- } else {
- addDataset(defaultDataverse + "." + name);
- }
}
| ( <LEFTPAREN> nameArg = Expression() <RIGHTPAREN> ) )
{
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
index 1195d37..771ae16 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.lang.common.parser;
-import java.util.List;
import java.util.Stack;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -35,17 +34,14 @@
protected Counter varCounter = new Counter(-1);
- protected Stack<Scope> scopeStack = new Stack<Scope>();
+ protected Stack<Scope> scopeStack = new Stack<>();
- protected Stack<Scope> forbiddenScopeStack = new Stack<Scope>();
+ protected Stack<Scope> forbiddenScopeStack = new Stack<>();
protected String[] inputLines;
protected String defaultDataverse;
- private List<String> dataverses;
- private List<String> datasets;
-
public ScopeChecker() {
scopeStack.push(RootScopeFactory.createRootScope(this));
}
@@ -323,32 +319,4 @@
extract.append(inputLines[endLine - 1].substring(0, endColumn - 1));
return extract.toString().trim();
}
-
- public void addDataverse(String dataverseName) {
- if (dataverses != null) {
- dataverses.add(dataverseName);
- }
- }
-
- public void addDataset(String datasetName) {
- if (datasets != null) {
- datasets.add(datasetName);
- }
- }
-
- public void setDataverses(List<String> dataverses) {
- this.dataverses = dataverses;
- }
-
- public void setDatasets(List<String> datasets) {
- this.datasets = datasets;
- }
-
- public List<String> getDataverses() {
- return dataverses;
- }
-
- public List<String> getDatasets() {
- return datasets;
- }
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
index 3bd309a..7bf0db2 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.lang.common.statement;
-import java.util.List;
-
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
@@ -35,19 +33,15 @@
private Identifier datasetName;
private Expression condition;
private int varCounter;
- private List<String> dataverses;
- private List<String> datasets;
private Query rewrittenQuery;
public DeleteStatement(VariableExpr vars, Identifier dataverseName, Identifier datasetName, Expression condition,
- int varCounter, List<String> dataverses, List<String> datasets) {
+ int varCounter) {
this.vars = vars;
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.condition = condition;
this.varCounter = varCounter;
- this.dataverses = dataverses;
- this.datasets = datasets;
}
@Override
@@ -88,18 +82,9 @@
return visitor.visit(this, arg);
}
- public List<String> getDataverses() {
- return dataverses;
- }
-
- public List<String> getDatasets() {
- return datasets;
- }
-
@Override
public int hashCode() {
- return ObjectUtils.hashCodeMulti(condition, datasetName, datasets, dataverseName, dataverses, rewrittenQuery,
- vars);
+ return ObjectUtils.hashCodeMulti(condition, datasetName, dataverseName, rewrittenQuery, vars);
}
@Override
@@ -111,11 +96,11 @@
return false;
}
DeleteStatement target = (DeleteStatement) object;
- boolean equals = ObjectUtils.equals(condition, target.condition)
- && ObjectUtils.equals(datasetName, target.datasetName) && ObjectUtils.equals(datasets, target.datasets)
- && ObjectUtils.equals(dataverseName, target.dataverseName);
- return equals && ObjectUtils.equals(dataverses, target.dataverses)
- && ObjectUtils.equals(rewrittenQuery, target.rewrittenQuery) && ObjectUtils.equals(vars, target.vars);
+ boolean equals =
+ ObjectUtils.equals(condition, target.condition) && ObjectUtils.equals(datasetName, target.datasetName)
+ && ObjectUtils.equals(dataverseName, target.dataverseName);
+ return equals && ObjectUtils.equals(rewrittenQuery, target.rewrittenQuery)
+ && ObjectUtils.equals(vars, target.vars);
}
@Override
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
index 2aee087..7274ae3 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.lang.common.statement;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -34,21 +33,16 @@
private boolean topLevel = true;
private Expression body;
private int varCounter;
- private List<String> dataverses = new ArrayList<>();
- private List<String> datasets = new ArrayList<>();
public Query(boolean explain) {
this.explain = explain;
}
- public Query(boolean explain, boolean topLevel, Expression body, int varCounter, List<String> dataverses,
- List<String> datasets) {
+ public Query(boolean explain, boolean topLevel, Expression body, int varCounter) {
this.explain = explain;
this.topLevel = topLevel;
this.body = body;
this.varCounter = varCounter;
- this.dataverses.addAll(dataverses);
- this.datasets.addAll(datasets);
}
@Override
@@ -99,25 +93,9 @@
return visitor.visit(this, arg);
}
- public void setDataverses(List<String> dataverses) {
- this.dataverses = dataverses;
- }
-
- public void setDatasets(List<String> datasets) {
- this.datasets = datasets;
- }
-
- public List<String> getDataverses() {
- return dataverses;
- }
-
- public List<String> getDatasets() {
- return datasets;
- }
-
@Override
public int hashCode() {
- return ObjectUtils.hashCodeMulti(body, datasets, dataverses, topLevel, explain);
+ return ObjectUtils.hashCodeMulti(body, topLevel, explain);
}
@Override
@@ -129,9 +107,7 @@
return false;
}
Query target = (Query) object;
- return explain == target.explain && ObjectUtils.equals(body, target.body)
- && ObjectUtils.equals(datasets, target.datasets) && ObjectUtils.equals(dataverses, target.dataverses)
- && topLevel == target.topLevel;
+ return explain == target.explain && ObjectUtils.equals(body, target.body) && topLevel == target.topLevel;
}
@Override
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
index 9b419f1..58c81a6 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
@@ -102,7 +102,7 @@
if (gc.hasGroupFieldList()) {
for (Pair<Expression, Identifier> varId : gc.getGroupFieldList()) {
Expression newExpr = (Expression) varId.first.accept(this, env).first;
- newGroupFieldList.add(new Pair<Expression, Identifier>(newExpr, varId.second));
+ newGroupFieldList.add(new Pair<>(newExpr, varId.second));
}
}
GroupbyClause newGroup = new GroupbyClause(newGbyList, newDecorList, newWithMap, newGroupVar, newGroupFieldList,
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
index a1c17fc..6346994 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
@@ -123,15 +123,14 @@
// Reference to a field in the group variable.
if (fieldVars.contains(usedVar)) {
// Rewrites to a reference to a field in the group variable.
- varExprMap.put(usedVar,
- new FieldAccessor(fromBindingVar, SqlppVariableUtil.toUserDefinedVariableName(usedVar
- .getVar())));
+ varExprMap.put(usedVar, new FieldAccessor(fromBindingVar,
+ SqlppVariableUtil.toUserDefinedVariableName(usedVar.getVar())));
}
}
// Select clause.
- SelectElement selectElement = new SelectElement(
- SqlppRewriteUtil.substituteExpression(expr, varExprMap, context));
+ SelectElement selectElement =
+ new SelectElement(SqlppRewriteUtil.substituteExpression(expr, varExprMap, context));
SelectClause selectClause = new SelectClause(selectElement, null, false);
// Construct the select expression.
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
index a066110..a8c5ddc 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
@@ -42,8 +42,8 @@
// Applying sugar rewriting for group-by.
public static Expression rewriteExpressionUsingGroupVariable(VariableExpr groupVar,
- Collection<VariableExpr> fieldVars, ILangExpression expr,
- LangRewritingContext context) throws CompilationException {
+ Collection<VariableExpr> fieldVars, ILangExpression expr, LangRewritingContext context)
+ throws CompilationException {
SqlppGroupBySugarVisitor visitor = new SqlppGroupBySugarVisitor(context, groupVar, fieldVars);
return expr.accept(visitor, null);
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
index 7c2a1747..37e4e26 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
@@ -132,8 +132,7 @@
@Override
public Projection visit(Projection projection, Void arg) throws CompilationException {
return new Projection(projection.star() ? null : (Expression) projection.getExpression().accept(this, arg),
- projection.getName(),
- projection.star(), projection.exprStar());
+ projection.getName(), projection.star(), projection.exprStar());
}
@Override
@@ -233,8 +232,7 @@
@Override
public Query visit(Query q, Void arg) throws CompilationException {
- return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter(),
- q.getDataverses(), q.getDatasets());
+ return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter());
}
@Override
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
index 9579741..8904241 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
@@ -86,8 +86,8 @@
VariableSubstitutionEnvironment env) throws CompilationException {
VariableExpr leftVar = fromTerm.getLeftVariable();
VariableExpr newLeftVar = generateNewVariable(context, leftVar);
- VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable() ? generateNewVariable(context,
- fromTerm.getPositionalVariable()) : null;
+ VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable()
+ ? generateNewVariable(context, fromTerm.getPositionalVariable()) : null;
Expression newLeftExpr = (Expression) visitUnnesBindingExpression(fromTerm.getLeftExpression(), env).first;
List<AbstractBinaryCorrelateClause> newCorrelateClauses = new ArrayList<>();
@@ -123,8 +123,8 @@
VariableSubstitutionEnvironment env) throws CompilationException {
VariableExpr rightVar = joinClause.getRightVariable();
VariableExpr newRightVar = generateNewVariable(context, rightVar);
- VariableExpr newRightPosVar = joinClause.hasPositionalVariable() ? generateNewVariable(context,
- joinClause.getPositionalVariable()) : null;
+ VariableExpr newRightPosVar = joinClause.hasPositionalVariable()
+ ? generateNewVariable(context, joinClause.getPositionalVariable()) : null;
// Visits the right expression.
Expression newRightExpr = (Expression) visitUnnesBindingExpression(joinClause.getRightExpression(), env).first;
@@ -138,8 +138,8 @@
// The condition can refer to the newRightVar and newRightPosVar.
Expression conditionExpr = (Expression) joinClause.getConditionExpression().accept(this, currentEnv).first;
- JoinClause newJoinClause = new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar,
- conditionExpr);
+ JoinClause newJoinClause =
+ new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar, conditionExpr);
return new Pair<>(newJoinClause, currentEnv);
}
@@ -148,8 +148,8 @@
VariableSubstitutionEnvironment env) throws CompilationException {
VariableExpr rightVar = nestClause.getRightVariable();
VariableExpr newRightVar = generateNewVariable(context, rightVar);
- VariableExpr newRightPosVar = nestClause.hasPositionalVariable() ? generateNewVariable(context,
- nestClause.getPositionalVariable()) : null;
+ VariableExpr newRightPosVar = nestClause.hasPositionalVariable()
+ ? generateNewVariable(context, nestClause.getPositionalVariable()) : null;
// Visits the right expression.
Expression rightExpr = (Expression) nestClause.getRightExpression().accept(this, env).first;
@@ -163,8 +163,8 @@
// The condition can refer to the newRightVar and newRightPosVar.
Expression conditionExpr = (Expression) nestClause.getConditionExpression().accept(this, currentEnv).first;
- NestClause newJoinClause = new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar,
- conditionExpr);
+ NestClause newJoinClause =
+ new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar, conditionExpr);
return new Pair<>(newJoinClause, currentEnv);
}
@@ -173,8 +173,8 @@
VariableSubstitutionEnvironment env) throws CompilationException {
VariableExpr rightVar = unnestClause.getRightVariable();
VariableExpr newRightVar = generateNewVariable(context, rightVar);
- VariableExpr newRightPosVar = unnestClause.hasPositionalVariable() ? generateNewVariable(context,
- unnestClause.getPositionalVariable()) : null;
+ VariableExpr newRightPosVar = unnestClause.hasPositionalVariable()
+ ? generateNewVariable(context, unnestClause.getPositionalVariable()) : null;
// Visits the right expression.
Expression rightExpr = (Expression) visitUnnesBindingExpression(unnestClause.getRightExpression(), env).first;
@@ -186,8 +186,8 @@
currentEnv.removeSubstitution(newRightPosVar);
}
// The condition can refer to the newRightVar and newRightPosVar.
- UnnestClause newJoinClause = new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar,
- newRightPosVar);
+ UnnestClause newJoinClause =
+ new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar);
return new Pair<>(newJoinClause, currentEnv);
}
@@ -265,13 +265,13 @@
VariableSubstitutionEnvironment env) throws CompilationException {
boolean distinct = selectClause.distinct();
if (selectClause.selectElement()) {
- Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement = selectClause.getSelectElement()
- .accept(this, env);
+ Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement =
+ selectClause.getSelectElement().accept(this, env);
return new Pair<>(new SelectClause((SelectElement) newSelectElement.first, null, distinct),
newSelectElement.second);
} else {
- Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular = selectClause.getSelectRegular()
- .accept(this, env);
+ Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular =
+ selectClause.getSelectRegular().accept(this, env);
return new Pair<>(new SelectClause(null, (SelectRegular) newSelectRegular.first, distinct),
newSelectRegular.second);
}
@@ -280,8 +280,8 @@
@Override
public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(SelectElement selectElement,
VariableSubstitutionEnvironment env) throws CompilationException {
- Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr = selectElement.getExpression()
- .accept(this, env);
+ Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr =
+ selectElement.getExpression().accept(this, env);
return new Pair<>(new SelectElement((Expression) newExpr.first), newExpr.second);
}
@@ -318,12 +318,12 @@
SetOperationInput newRightInput;
SetOperationInput rightInput = right.getSetOperationRightInput();
if (rightInput.selectBlock()) {
- Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSelectBlock()
- .accept(this, env);
+ Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult =
+ rightInput.getSelectBlock().accept(this, env);
newRightInput = new SetOperationInput((SelectBlock) rightResult.first, null);
} else {
- Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSubquery()
- .accept(this, env);
+ Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult =
+ rightInput.getSubquery().accept(this, env);
newRightInput = new SetOperationInput(null, (SelectExpression) rightResult.first);
}
newRightInputs.add(new SetOperationRight(right.getSetOpType(), right.isSetSemantics(), newRightInput));
@@ -392,10 +392,10 @@
public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(CaseExpression caseExpr,
VariableSubstitutionEnvironment env) throws CompilationException {
Expression conditionExpr = (Expression) caseExpr.getConditionExpr().accept(this, env).first;
- List<Expression> whenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(),
- env, this);
- List<Expression> thenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(),
- env, this);
+ List<Expression> whenExprList =
+ VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(), env, this);
+ List<Expression> thenExprList =
+ VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(), env, this);
Expression elseExpr = (Expression) caseExpr.getElseExpr().accept(this, env).first;
CaseExpression newCaseExpr = new CaseExpression(conditionExpr, whenExprList, thenExprList, elseExpr);
return new Pair<>(newCaseExpr, env);
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
index 503437c..680ce55 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
@@ -58,8 +58,8 @@
: dataverseName.getValue() + "." + datasetName.getValue();
LiteralExpr argumentLiteral = new LiteralExpr(new StringLiteral(arg));
arguments.add(argumentLiteral);
- CallExpr callExpression = new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1),
- arguments);
+ CallExpr callExpression =
+ new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), arguments);
// From clause.
VariableExpr var = deleteStmt.getVariableExpr();
@@ -84,7 +84,7 @@
SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, null, whereClause, null, null, null);
SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
SelectExpression selectExpression = new SelectExpression(null, selectSetOperation, null, null, false);
- Query query = new Query(false, false, selectExpression, 0, new ArrayList<>(), new ArrayList<>());
+ Query query = new Query(false, false, selectExpression, 0);
query.setBody(selectExpression);
// return the delete statement.
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index e553c4d..e6a2951 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -1029,10 +1029,6 @@
VariableExpr varExpr = null;
Expression condition = null;
Pair<Identifier, Identifier> nameComponents;
- // This is related to the new metadata lock management
- setDataverses(new ArrayList<String>());
- setDatasets(new ArrayList<String>());
-
}
{
<DELETE>
@@ -1040,20 +1036,13 @@
((<AS>)? varExpr = Variable())?
(<WHERE> condition = Expression())?
{
- // First we get the dataverses and datasets that we want to lock
- List<String> dataverses = getDataverses();
- List<String> datasets = getDatasets();
- // we remove the pointer to the dataverses and datasets
- setDataverses(null);
- setDatasets(null);
-
if(varExpr == null){
varExpr = new VariableExpr();
VarIdentifier var = SqlppVariableUtil.toInternalVariableIdentifier(nameComponents.second.getValue());
varExpr.setVar(var);
}
return new DeleteStatement(varExpr, nameComponents.first, nameComponents.second,
- condition, getVarCounter(), dataverses, datasets);
+ condition, getVarCounter());
}
}
@@ -1738,9 +1727,6 @@
Query Query(boolean explain) throws ParseException:
{
Query query = new Query(explain);
- // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
- setDataverses(query.getDataverses());
- setDatasets(query.getDatasets());
Expression expr;
}
{
@@ -1751,9 +1737,6 @@
)
{
query.setBody(expr);
- // we remove the pointers to the locked entities before we return the query object
- setDataverses(null);
- setDatasets(null);
return query;
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 5d44d0b..b2c2e81 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -400,7 +400,7 @@
ARecordType aRecType = (ARecordType) datatype.getDatatype();
return new Datatype(
datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(aRecType.getTypeName(),
- aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
+ aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
datatype.getIsAnonymous());
}
try {
@@ -917,7 +917,7 @@
@Override
public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName,
- Integer fileNumber) throws MetadataException {
+ Integer fileNumber) throws MetadataException {
ExternalFile file;
try {
file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber);
@@ -985,8 +985,7 @@
@Override
public <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx,
- IExtensionMetadataSearchKey searchKey)
- throws MetadataException {
+ IExtensionMetadataSearchKey searchKey) throws MetadataException {
try {
return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey);
} catch (RemoteException e) {
@@ -1021,14 +1020,14 @@
return;
}
try {
- metadataNode = proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(),
- TimeUnit.SECONDS);
+ metadataNode =
+ proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS);
if (metadataNode != null) {
rebindMetadataNode = false;
} else {
throw new HyracksDataException("The MetadataNode failed to bind before the configured timeout ("
- + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " +
- "configured to run on NC: " + metadataProperties.getMetadataNodeName());
+ + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was "
+ + "configured to run on NC: " + metadataProperties.getMetadataNodeName());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index bd1c7d1..8ce53d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -695,11 +695,10 @@
/**
* Feed Connection Related Metadata operations
*/
- void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection)
- throws MetadataException;
+ void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) throws MetadataException;
- void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
- String datasetName) throws MetadataException;
+ void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, String datasetName)
+ throws MetadataException;
FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
String datasetName) throws MetadataException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
index 0560bd0..f3913e1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
@@ -28,7 +28,7 @@
import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f8f4a0e..aa76122 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -71,6 +71,8 @@
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.lock.LockList;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -150,6 +152,7 @@
private final StorageProperties storageProperties;
private final ILibraryManager libraryManager;
private final Dataverse defaultDataverse;
+ private final LockList locks;
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
@@ -160,7 +163,7 @@
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
private JobId jobId;
- private Map<String, Integer> locks;
+ private Map<String, Integer> externalDataLocks;
private boolean isTemporaryDatasetWriteJob = true;
private boolean blockingOperatorDisabled = false;
@@ -171,6 +174,7 @@
libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory();
primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory();
+ locks = new LockList();
}
public String getPropertyValue(String propertyName) {
@@ -280,12 +284,12 @@
return storageProperties;
}
- public Map<String, Integer> getLocks() {
- return locks;
+ public Map<String, Integer> getExternalDataLocks() {
+ return externalDataLocks;
}
- public void setLocks(Map<String, Integer> locks) {
- this.locks = locks;
+ public void setExternalDataLocks(Map<String, Integer> locks) {
+ this.externalDataLocks = locks;
}
/**
@@ -302,6 +306,9 @@
if (dv == null) {
return null;
}
+ String fqName = dv + '.' + dataset;
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, dv);
+ MetadataLockManager.INSTANCE.acquireDatasetReadLock(locks, fqName);
return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
}
@@ -917,7 +924,8 @@
public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
String targetIdxName, boolean temp) throws AlgebricksException {
- return SplitsAndConstraintsUtil.getDatasetSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+ return SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, datasetName), mdTxnCtx,
+ targetIdxName, temp);
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
@@ -939,8 +947,8 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
- return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
- targetIdxName, create);
+ return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(
+ findDataset(dataverseName, datasetName), mdTxnCtx, targetIdxName, create);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
@@ -1827,16 +1835,14 @@
splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory,
- searchCallbackFactory, indexName,
- prevFieldPermutation, metadataPageManagerFactory);
+ searchCallbackFactory, indexName, prevFieldPermutation, metadataPageManagerFactory);
} else {
op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(), splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory,
- indexName,
- metadataPageManagerFactory);
+ indexName, metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
} catch (Exception e) {
@@ -2064,4 +2070,8 @@
ds.getDatasetDetails().isTemp());
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}
+
+ public LockList getLocks() {
+ return locks;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e0607a6..572130d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -50,8 +50,8 @@
import org.apache.asterix.metadata.api.IMetadataEntity;
import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.InvertedIndexDataflowHelperFactoryProvider;
@@ -283,9 +283,7 @@
// prepare job spec(s) that would disconnect any active feeds involving the dataset.
IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
for (IActiveEntityEventsListener listener : activeListeners) {
- IDataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
- dataverseName, datasetName);
- if (listener.isEntityUsingDataset(ds)) {
+ if (listener.isEntityUsingDataset(this)) {
throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
listener.getEntityId().toString());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 45e358f..584aead 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -39,6 +39,7 @@
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
@@ -47,7 +48,6 @@
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -59,17 +59,16 @@
*/
public class FeedMetadataUtil {
- public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
- throws CompilationException {
- Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
+ public static Dataset validateIfDatasetExists(MetadataProvider metadataProvider, String dataverse,
+ String datasetName, MetadataTransactionContext ctx) throws AlgebricksException {
+ Dataset dataset = metadataProvider.findDataset(dataverse, datasetName);
if (dataset == null) {
throw new CompilationException("Unknown target dataset :" + datasetName);
}
if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
throw new CompilationException("Statement not applicable. Dataset " + datasetName
- + " is not of required type "
- + DatasetType.INTERNAL);
+ + " is not of required type " + DatasetType.INTERNAL);
}
return dataset;
}
@@ -87,8 +86,8 @@
MetadataTransactionContext ctx) throws CompilationException {
FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
if (feedPolicy == null) {
- feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
- policyName);
+ feedPolicy =
+ MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
if (feedPolicy == null) {
throw new CompilationException("Unknown feed policy" + policyName);
}
@@ -155,7 +154,7 @@
}
}
} catch (Exception e) {
- throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage() , e);
+ throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage(), e);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
new file mode 100644
index 0000000..ffcff78
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+import org.apache.asterix.om.base.AMutableInt32;
+
+public class DatasetLock implements IMetadataLock {
+
+ private ReentrantReadWriteLock dsLock;
+ private ReentrantReadWriteLock dsModifyLock;
+ private AMutableInt32 indexBuildCounter;
+
+ public DatasetLock() {
+ dsLock = new ReentrantReadWriteLock(true);
+ dsModifyLock = new ReentrantReadWriteLock(true);
+ indexBuildCounter = new AMutableInt32(0);
+ }
+
+ private void acquireReadLock() {
+ // query
+ // build index
+ // insert
+ dsLock.readLock().lock();
+ }
+
+ private void releaseReadLock() {
+ // query
+ // build index
+ // insert
+ dsLock.readLock().unlock();
+ }
+
+ private void acquireWriteLock() {
+ // create ds
+ // delete ds
+ // drop index
+ dsLock.writeLock().lock();
+ }
+
+ private void releaseWriteLock() {
+ // create ds
+ // delete ds
+ // drop index
+ dsLock.writeLock().unlock();
+ }
+
+ private void acquireReadModifyLock() {
+ // insert
+ dsModifyLock.readLock().lock();
+ }
+
+ private void releaseReadModifyLock() {
+ // insert
+ dsModifyLock.readLock().unlock();
+ }
+
+ private void acquireWriteModifyLock() {
+ // Build index statement
+ synchronized (indexBuildCounter) {
+ if (indexBuildCounter.getIntegerValue() > 0) {
+ indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
+ } else {
+ dsModifyLock.writeLock().lock();
+ indexBuildCounter.setValue(1);
+ }
+ }
+ }
+
+ private void releaseWriteModifyLock() {
+ // Build index statement
+ synchronized (indexBuildCounter) {
+ if (indexBuildCounter.getIntegerValue() == 1) {
+ dsModifyLock.writeLock().unlock();
+ }
+ indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
+ }
+ }
+
+ private void acquireRefreshLock() {
+ // Refresh External Dataset statement
+ dsModifyLock.writeLock().lock();
+ }
+
+ private void releaseRefreshLock() {
+ // Refresh External Dataset statement
+ dsModifyLock.writeLock().unlock();
+ }
+
+ @Override
+ public void acquire(IMetadataLock.Mode mode) {
+ switch (mode) {
+ case INDEX_BUILD:
+ acquireReadLock();
+ acquireWriteModifyLock();
+ break;
+ case MODIFY:
+ acquireReadLock();
+ acquireReadModifyLock();
+ break;
+ case REFRESH:
+ acquireReadLock();
+ acquireRefreshLock();
+ break;
+ case INDEX_DROP:
+ case WRITE:
+ acquireWriteLock();
+ break;
+ default:
+ acquireReadLock();
+ break;
+ }
+ }
+
+ @Override
+ public void release(IMetadataLock.Mode mode) {
+ switch (mode) {
+ case INDEX_BUILD:
+ releaseWriteModifyLock();
+ releaseReadLock();
+ break;
+ case MODIFY:
+ releaseReadModifyLock();
+ releaseReadLock();
+ break;
+ case REFRESH:
+ releaseRefreshLock();
+ releaseReadLock();
+ break;
+ case INDEX_DROP:
+ case WRITE:
+ releaseWriteLock();
+ break;
+ default:
+ releaseReadLock();
+ break;
+ }
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
similarity index 94%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
rename to asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
index 2e35ed4..4d8bacf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.utils;
+package org.apache.asterix.metadata.lock;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +26,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.ExternalDatasetAccessManager;
/**
* This is a singelton class used to maintain the version of each external dataset with indexes
@@ -62,10 +63,10 @@
Map<String, Integer> locks;
String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName();
// check first if the lock was aquired already
- locks = metadataProvider.getLocks();
+ locks = metadataProvider.getExternalDataLocks();
if (locks == null) {
locks = new HashMap<>();
- metadataProvider.setLocks(locks);
+ metadataProvider.setExternalDataLocks(locks);
} else {
// if dataset was accessed already by this job, return the registered version
Integer version = locks.get(lockKey);
@@ -123,7 +124,7 @@
}
public void releaseAcquiredLocks(MetadataProvider metadataProvider) {
- Map<String, Integer> locks = metadataProvider.getLocks();
+ Map<String, Integer> locks = metadataProvider.getExternalDataLocks();
if (locks == null) {
return;
} else {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java
new file mode 100644
index 0000000..8af29ba
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+/**
+ * A Metadata lock local to compilation node
+ */
+public interface IMetadataLock {
+
+ enum Mode {
+ READ,
+ MODIFY,
+ REFRESH,
+ INDEX_BUILD,
+ INDEX_DROP,
+ WRITE
+ }
+
+ /**
+ * Acquire a lock
+ *
+ * @param mode
+ * lock mode
+ */
+ void acquire(IMetadataLock.Mode mode);
+
+ /**
+ * Release a lock
+ *
+ * @param mode
+ * lock mode
+ */
+ void release(IMetadataLock.Mode mode);
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
new file mode 100644
index 0000000..6e6f086
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+import org.apache.commons.lang3.tuple.Pair;
+
+public class LockList {
+ List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new ArrayList<>();
+
+ public void add(IMetadataLock.Mode mode, IMetadataLock lock) {
+ lock.acquire(mode);
+ locks.add(Pair.of(mode, lock));
+ }
+
+ public void unlock() {
+ for (int i = locks.size() - 1; i >= 0; i--) {
+ Pair<IMetadataLock.Mode, IMetadataLock> pair = locks.get(i);
+ pair.getRight().release(pair.getLeft());
+ }
+ }
+
+ public void reset() {
+ locks.clear();
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
new file mode 100644
index 0000000..c8fd64f
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+
+public class MetadataLock implements IMetadataLock {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ @Override
+ public void acquire(IMetadataLock.Mode mode) {
+ switch (mode) {
+ case WRITE:
+ lock.writeLock().lock();
+ break;
+ default:
+ lock.readLock().lock();
+ break;
+ }
+ }
+
+ @Override
+ public void release(IMetadataLock.Mode mode) {
+ switch (mode) {
+ case WRITE:
+ lock.writeLock().unlock();
+ break;
+ default:
+ lock.readLock().unlock();
+ break;
+ }
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
new file mode 100644
index 0000000..5ae3aa4
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+
+public class MetadataLockManager {
+
+ public static final MetadataLockManager INSTANCE = new MetadataLockManager();
+ private static final Function<String, MetadataLock> LOCK_FUNCTION = key -> new MetadataLock();
+ private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
+
+ private final ConcurrentHashMap<String, MetadataLock> dataversesLocks;
+ private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+ private final ConcurrentHashMap<String, MetadataLock> functionsLocks;
+ private final ConcurrentHashMap<String, MetadataLock> nodeGroupsLocks;
+ private final ConcurrentHashMap<String, MetadataLock> feedsLocks;
+ private final ConcurrentHashMap<String, MetadataLock> feedPolicyLocks;
+ private final ConcurrentHashMap<String, MetadataLock> compactionPolicyLocks;
+ private final ConcurrentHashMap<String, MetadataLock> dataTypeLocks;
+ private final ConcurrentHashMap<String, MetadataLock> extensionLocks;
+
+ private MetadataLockManager() {
+ dataversesLocks = new ConcurrentHashMap<>();
+ datasetsLocks = new ConcurrentHashMap<>();
+ functionsLocks = new ConcurrentHashMap<>();
+ nodeGroupsLocks = new ConcurrentHashMap<>();
+ feedsLocks = new ConcurrentHashMap<>();
+ feedPolicyLocks = new ConcurrentHashMap<>();
+ compactionPolicyLocks = new ConcurrentHashMap<>();
+ dataTypeLocks = new ConcurrentHashMap<>();
+ extensionLocks = new ConcurrentHashMap<>();
+ }
+
+ // Dataverse
+ public void acquireDataverseReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireDataverseWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // Dataset
+ public void acquireDatasetReadLock(LockList locks, String datasetName) {
+ DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireDatasetWriteLock(LockList locks, String datasetName) {
+ DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ public void acquireDatasetModifyLock(LockList locks, String datasetName) {
+ DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.MODIFY, lock);
+ }
+
+ public void acquireDatasetCreateIndexLock(LockList locks, String datasetName) {
+ DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
+ }
+
+ public void acquireExternalDatasetRefreshLock(LockList locks, String datasetName) {
+ DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
+ }
+
+ // Function
+ public void acquireFunctionReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireFunctionWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // Node Group
+ public void acquireNodeGroupReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireNodeGroupWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // Feeds
+ public void acquireFeedReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireFeedWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // Feed Policies
+ public void acquireFeedPolicyReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireFeedPolicyWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // CompactionPolicy
+ public void acquireCompactionPolicyReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireCompactionPolicyWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // DataType
+ public void acquireDataTypeReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireDataTypeWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ // Extensions
+ public void acquireExtensionReadLock(LockList locks, String dataverseName) {
+ MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.READ, lock);
+ }
+
+ public void acquireExtensionWriteLock(LockList locks, String dataverseName) {
+ MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+ locks.add(IMetadataLock.Mode.WRITE, lock);
+ }
+
+ public void createDatasetBegin(LockList locks, String dataverseName, String itemTypeDataverseName,
+ String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
+ String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
+ boolean isDefaultCompactionPolicy) {
+ acquireDataverseReadLock(locks, dataverseName);
+ if (!dataverseName.equals(itemTypeDataverseName)) {
+ acquireDataverseReadLock(locks, itemTypeDataverseName);
+ }
+ if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
+ && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
+ acquireDataverseReadLock(locks, metaItemTypeDataverseName);
+ }
+ acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
+ if (metaItemTypeFullyQualifiedName != null
+ && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
+ acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
+ }
+ acquireNodeGroupReadLock(locks, nodeGroupName);
+ if (!isDefaultCompactionPolicy) {
+ acquireCompactionPolicyReadLock(locks, compactionPolicyName);
+ }
+ acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void createIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void createTypeBegin(LockList locks, String dataverseName, String itemTypeFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName);
+ }
+
+ public void dropDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void dropIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void dropTypeBegin(LockList locks, String dataverseName, String dataTypeFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName);
+ }
+
+ public void functionStatementBegin(LockList locks, String dataverseName, String functionFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFunctionWriteLock(locks, functionFullyQualifiedName);
+ }
+
+ public void modifyDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void insertDeleteUpsertBegin(LockList locks, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, DatasetUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName));
+ acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void dropFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFeedWriteLock(locks, feedFullyQualifiedName);
+ }
+
+ public void dropFeedPolicyBegin(LockList locks, String dataverseName, String policyName) {
+ acquireFeedWriteLock(locks, policyName);
+ acquireDataverseReadLock(locks, dataverseName);
+ }
+
+ public void startFeedBegin(LockList locks, String dataverseName, String feedName,
+ List<FeedConnection> feedConnections) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFeedReadLock(locks, feedName);
+ for (FeedConnection feedConnection : feedConnections) {
+ // what if the dataset is in a different dataverse
+ String fqName = dataverseName + "." + feedConnection.getDatasetName();
+ acquireDatasetReadLock(locks, fqName);
+ }
+ }
+
+ public void stopFeedBegin(LockList locks, String dataverseName, String feedName) {
+ // TODO: dataset lock?
+ // Dataset locks are not required here since datasets are protected by the active event listener
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFeedReadLock(locks, feedName);
+ }
+
+ public void createFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFeedWriteLock(locks, feedFullyQualifiedName);
+ }
+
+ public void connectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
+ String feedFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+ acquireFeedReadLock(locks, feedFullyQualifiedName);
+ }
+
+ public void createFeedPolicyBegin(LockList locks, String dataverseName, String policyName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireFeedPolicyWriteLock(locks, policyName);
+ }
+
+ public void disconnectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
+ String feedFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+ acquireFeedReadLock(locks, feedFullyQualifiedName);
+ }
+
+ public void compactBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+ }
+
+ public void refreshDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(locks, dataverseName);
+ acquireExternalDatasetRefreshLock(locks, datasetFullyQualifiedName);
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java
deleted file mode 100644
index d53191c..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.asterix.om.base.AMutableInt32;
-
-public class DatasetLock {
-
- private ReentrantReadWriteLock dsLock;
- private ReentrantReadWriteLock dsModifyLock;
- private AMutableInt32 indexBuildCounter;
-
- public DatasetLock() {
- dsLock = new ReentrantReadWriteLock(true);
- dsModifyLock = new ReentrantReadWriteLock(true);
- indexBuildCounter = new AMutableInt32(0);
- }
-
- public void acquireReadLock() {
- // query
- // build index
- // insert
- dsLock.readLock().lock();
- }
-
- public void releaseReadLock() {
- // query
- // build index
- // insert
- dsLock.readLock().unlock();
- }
-
- public void acquireWriteLock() {
- // create ds
- // delete ds
- // drop index
- dsLock.writeLock().lock();
- }
-
- public void releaseWriteLock() {
- // create ds
- // delete ds
- // drop index
- dsLock.writeLock().unlock();
- }
-
- public void acquireReadModifyLock() {
- // insert
- dsModifyLock.readLock().lock();
- }
-
- public void releaseReadModifyLock() {
- // insert
- dsModifyLock.readLock().unlock();
- }
-
- public void acquireWriteModifyLock() {
- // Build index statement
- synchronized (indexBuildCounter) {
- if (indexBuildCounter.getIntegerValue() > 0) {
- indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
- } else {
- dsModifyLock.writeLock().lock();
- indexBuildCounter.setValue(1);
- }
- }
- }
-
- public void releaseWriteModifyLock() {
- // Build index statement
- synchronized (indexBuildCounter) {
- if (indexBuildCounter.getIntegerValue() == 1) {
- dsModifyLock.writeLock().unlock();
- }
- indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
- }
- }
-
- public void acquireRefreshLock() {
- // Refresh External Dataset statement
- dsModifyLock.writeLock().lock();
- }
-
- public void releaseRefreshLock() {
- // Refresh External Dataset statement
- dsModifyLock.writeLock().unlock();
- }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 572cc75..ca56cc3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -94,8 +94,8 @@
private DatasetUtil() {
}
- public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset,
- ARecordType itemType, ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+ public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
+ ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
throws AlgebricksException {
List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
@@ -181,8 +181,7 @@
public static List<List<String>> getPartitioningKeys(Dataset dataset) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- return IndexingConstants
- .getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
}
return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
}
@@ -276,8 +275,7 @@
String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
ILSMMergePolicyFactory mergePolicyFactory;
try {
- mergePolicyFactory =
- (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
+ mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
}
@@ -338,8 +336,8 @@
metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
- IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
- metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+ IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+ primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
IndexDropOperatorDescriptor primaryBtreeDrop =
new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
@@ -393,8 +391,8 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
- IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
- metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+ IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+ primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
IndexDropOperatorDescriptor primaryBtreeDrop =
new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
@@ -467,8 +465,8 @@
index, itemType, metaItemType, compactionInfo.first, compactionInfo.second);
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE,
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory,
+ localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE,
storageComponentProvider.getMetadataPageManagerFactory());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
@@ -518,4 +516,13 @@
spec.addRoot(compactOp);
return spec;
}
+
+ public static boolean isFullyQualifiedName(String datasetName) {
+ return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a .
+ }
+
+ public static String getDataverseFromFullyQualifiedName(String datasetName) {
+ int idx = datasetName.indexOf('.');
+ return datasetName.substring(0, idx);
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
deleted file mode 100644
index 9292008..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.FeedConnection;
-
-public class MetadataLockManager {
-
- public static final MetadataLockManager INSTANCE = new MetadataLockManager();
- private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key -> new ReentrantReadWriteLock();
- private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
- private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
- private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks;
-
- private MetadataLockManager() {
- dataversesLocks = new ConcurrentHashMap<>();
- datasetsLocks = new ConcurrentHashMap<>();
- functionsLocks = new ConcurrentHashMap<>();
- nodeGroupsLocks = new ConcurrentHashMap<>();
- feedsLocks = new ConcurrentHashMap<>();
- feedPolicyLocks = new ConcurrentHashMap<>();
- compactionPolicyLocks = new ConcurrentHashMap<>();
- dataTypeLocks = new ConcurrentHashMap<>();
- extensionLocks = new ConcurrentHashMap<>();
- }
-
- public void acquireDataverseReadLock(String dataverseName) {
- ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
- dvLock.readLock().lock();
- }
-
- public void releaseDataverseReadLock(String dataverseName) {
- dataversesLocks.get(dataverseName).readLock().unlock();
- }
-
- public void acquireDataverseWriteLock(String dataverseName) {
- ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
- dvLock.writeLock().lock();
- }
-
- public void releaseDataverseWriteLock(String dataverseName) {
- dataversesLocks.get(dataverseName).writeLock().unlock();
- }
-
- public void acquireDatasetReadLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
- dsLock.acquireReadLock();
- }
-
- public void releaseDatasetReadLock(String datasetName) {
- datasetsLocks.get(datasetName).releaseReadLock();
- }
-
- public void acquireDatasetWriteLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
- dsLock.acquireWriteLock();
- }
-
- public void releaseDatasetWriteLock(String datasetName) {
- datasetsLocks.get(datasetName).releaseWriteLock();
- }
-
- public void acquireDatasetModifyLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
- dsLock.acquireReadLock();
- dsLock.acquireReadModifyLock();
- }
-
- public void releaseDatasetModifyLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- dsLock.releaseReadModifyLock();
- dsLock.releaseReadLock();
- }
-
- public void acquireDatasetCreateIndexLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
- dsLock.acquireReadLock();
- dsLock.acquireWriteModifyLock();
- }
-
- public void releaseDatasetCreateIndexLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- dsLock.releaseWriteModifyLock();
- dsLock.releaseReadLock();
- }
-
- public void acquireExternalDatasetRefreshLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
- dsLock.acquireReadLock();
- dsLock.acquireRefreshLock();
- }
-
- public void releaseExternalDatasetRefreshLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- dsLock.releaseRefreshLock();
- dsLock.releaseReadLock();
- }
-
- public void acquireFunctionReadLock(String functionName) {
- ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
- fLock.readLock().lock();
- }
-
- public void releaseFunctionReadLock(String functionName) {
- functionsLocks.get(functionName).readLock().unlock();
- }
-
- public void acquireFunctionWriteLock(String functionName) {
- ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
- fLock.writeLock().lock();
- }
-
- public void releaseFunctionWriteLock(String functionName) {
- functionsLocks.get(functionName).writeLock().unlock();
- }
-
- public void acquireNodeGroupReadLock(String nodeGroupName) {
- ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
- ngLock.readLock().lock();
- }
-
- public void releaseNodeGroupReadLock(String nodeGroupName) {
- nodeGroupsLocks.get(nodeGroupName).readLock().unlock();
- }
-
- public void acquireNodeGroupWriteLock(String nodeGroupName) {
- ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
- ngLock.writeLock().lock();
- }
-
- public void releaseNodeGroupWriteLock(String nodeGroupName) {
- nodeGroupsLocks.get(nodeGroupName).writeLock().unlock();
- }
-
- public void acquireFeedReadLock(String feedName) {
- ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
- fLock.readLock().lock();
- }
-
- public void releaseFeedReadLock(String feedName) {
- feedsLocks.get(feedName).readLock().unlock();
- }
-
- public void acquireFeedWriteLock(String feedName) {
- ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
- fLock.writeLock().lock();
- }
-
- public void releaseFeedWriteLock(String feedName) {
- feedsLocks.get(feedName).writeLock().unlock();
- }
-
- public void acquireFeedPolicyWriteLock(String policyName) {
- ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION);
- fLock.writeLock().lock();
- }
-
- public void releaseFeedPolicyWriteLock(String policyName) {
- feedPolicyLocks.get(policyName).writeLock().unlock();
- }
-
- public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
- ReentrantReadWriteLock compactionPolicyLock =
- compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
- compactionPolicyLock.readLock().lock();
- }
-
- public void releaseCompactionPolicyReadLock(String compactionPolicyName) {
- compactionPolicyLocks.get(compactionPolicyName).readLock().unlock();
- }
-
- public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
- ReentrantReadWriteLock compactionPolicyLock =
- compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
- compactionPolicyLock.writeLock().lock();
- }
-
- public void releaseCompactionPolicyWriteLock(String compactionPolicyName) {
- compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock();
- }
-
- public void acquireDataTypeReadLock(String dataTypeName) {
- ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
- dataTypeLock.readLock().lock();
- }
-
- public void releaseDataTypeReadLock(String dataTypeName) {
- dataTypeLocks.get(dataTypeName).readLock().unlock();
- }
-
- public void acquireDataTypeWriteLock(String dataTypeName) {
- ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
- dataTypeLock.writeLock().lock();
- }
-
- public void releaseDataTypeWriteLock(String dataTypeName) {
- dataTypeLocks.get(dataTypeName).writeLock().unlock();
- }
-
- public void createDatasetBegin(String dataverseName, String itemTypeDataverseName,
- String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
- String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
- boolean isDefaultCompactionPolicy) {
- acquireDataverseReadLock(dataverseName);
- if (!dataverseName.equals(itemTypeDataverseName)) {
- acquireDataverseReadLock(itemTypeDataverseName);
- }
- if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
- && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
- acquireDataverseReadLock(metaItemTypeDataverseName);
- }
- acquireDataTypeReadLock(itemTypeFullyQualifiedName);
- if (metaItemTypeFullyQualifiedName != null
- && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
- acquireDataTypeReadLock(metaItemTypeFullyQualifiedName);
- }
- acquireNodeGroupReadLock(nodeGroupName);
- if (!isDefaultCompactionPolicy) {
- acquireCompactionPolicyReadLock(compactionPolicyName);
- }
- acquireDatasetWriteLock(datasetFullyQualifiedName);
- }
-
- public void createDatasetEnd(String dataverseName, String itemTypeDataverseName, String itemTypeFullyQualifiedName,
- String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, String nodeGroupName,
- String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
- releaseDatasetWriteLock(datasetFullyQualifiedName);
- if (!isDefaultCompactionPolicy) {
- releaseCompactionPolicyReadLock(compactionPolicyName);
- }
- releaseNodeGroupReadLock(nodeGroupName);
- if (metaItemTypeFullyQualifiedName != null
- && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
- releaseDataTypeReadLock(metaItemTypeFullyQualifiedName);
- }
- releaseDataTypeReadLock(itemTypeFullyQualifiedName);
- if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
- && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
- releaseDataverseReadLock(metaItemTypeDataverseName);
- }
- if (!dataverseName.equals(itemTypeDataverseName)) {
- releaseDataverseReadLock(itemTypeDataverseName);
- }
- releaseDataverseReadLock(dataverseName);
- }
-
- public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetCreateIndexLock(datasetFullyQualifiedName);
- }
-
- public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseDatasetCreateIndexLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDataTypeWriteLock(itemTypeFullyQualifiedName);
- }
-
- public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) {
- releaseDataTypeWriteLock(itemTypeFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetWriteLock(datasetFullyQualifiedName);
- }
-
- public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseDatasetWriteLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetWriteLock(datasetFullyQualifiedName);
- }
-
- public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseDatasetWriteLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDataTypeWriteLock(dataTypeFullyQualifiedName);
- }
-
- public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) {
- releaseDataTypeWriteLock(dataTypeFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireFunctionWriteLock(functionFullyQualifiedName);
- }
-
- public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName) {
- releaseFunctionWriteLock(functionFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetModifyLock(datasetFullyQualifiedName);
- }
-
- public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseDatasetModifyLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
- List<String> datasets) {
- dataverses.add(dataverseName);
- datasets.add(datasetFullyQualifiedName);
- Collections.sort(dataverses);
- Collections.sort(datasets);
-
- String previous = null;
- for (int i = 0; i < dataverses.size(); i++) {
- String current = dataverses.get(i);
- if (!current.equals(previous)) {
- acquireDataverseReadLock(current);
- previous = current;
- }
- }
-
- for (int i = 0; i < datasets.size(); i++) {
- String current = datasets.get(i);
- if (!current.equals(previous)) {
- if (current.equals(datasetFullyQualifiedName)) {
- acquireDatasetModifyLock(current);
- } else {
- acquireDatasetReadLock(current);
- }
- previous = current;
- }
- }
- }
-
- public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
- List<String> datasets) {
- String previous = null;
- for (int i = dataverses.size() - 1; i >= 0; i--) {
- String current = dataverses.get(i);
- if (!current.equals(previous)) {
- releaseDataverseReadLock(current);
- previous = current;
- }
- }
- for (int i = datasets.size() - 1; i >= 0; i--) {
- String current = datasets.get(i);
- if (!current.equals(previous)) {
- if (current.equals(datasetFullyQualifiedName)) {
- releaseDatasetModifyLock(current);
- } else {
- releaseDatasetReadLock(current);
- }
- previous = current;
- }
- }
- }
-
- public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireFeedWriteLock(feedFullyQualifiedName);
- }
-
- public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) {
- releaseFeedWriteLock(feedFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void dropFeedPolicyBegin(String dataverseName, String policyName) {
- acquireFeedWriteLock(policyName);
- acquireDataverseReadLock(dataverseName);
- }
-
- public void dropFeedPolicyEnd(String dataverseName, String policyName) {
- releaseFeedWriteLock(policyName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection> feedConnections) {
- acquireDataverseReadLock(dataverseName);
- acquireFeedReadLock(feedName);
- for (FeedConnection feedConnection : feedConnections) {
- acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
- }
- }
-
- public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection> feedConnections) {
- releaseDataverseReadLock(dataverseName);
- releaseFeedReadLock(feedName);
- for (FeedConnection feedConnection : feedConnections) {
- releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
- }
- }
-
- public void StopFeedBegin(String dataverseName, String feedName) {
- // TODO: dataset lock?
- acquireDataverseReadLock(dataverseName);
- acquireFeedReadLock(feedName);
- }
-
- public void StopFeedEnd(String dataverseName, String feedName) {
- releaseDataverseReadLock(dataverseName);
- releaseFeedReadLock(feedName);
- }
-
- public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireFeedWriteLock(feedFullyQualifiedName);
- }
-
- public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) {
- releaseFeedWriteLock(feedFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetReadLock(datasetFullyQualifiedName);
- acquireFeedReadLock(feedFullyQualifiedName);
- }
-
- public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
- releaseFeedReadLock(feedFullyQualifiedName);
- releaseDatasetReadLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void createFeedPolicyBegin(String dataverseName, String policyName) {
- acquireDataverseReadLock(dataverseName);
- acquireFeedPolicyWriteLock(policyName);
- }
-
- public void createFeedPolicyEnd(String dataverseName, String policyName) {
- releaseFeedPolicyWriteLock(policyName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetReadLock(datasetFullyQualifiedName);
- acquireFeedReadLock(feedFullyQualifiedName);
- }
-
- public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
- releaseFeedReadLock(feedFullyQualifiedName);
- releaseDatasetReadLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetReadLock(datasetFullyQualifiedName);
- acquireFeedReadLock(feedFullyQualifiedName);
- }
-
- public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
- releaseFeedReadLock(feedFullyQualifiedName);
- releaseDatasetReadLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireDatasetReadLock(datasetFullyQualifiedName);
- }
-
- public void compactEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseDatasetReadLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String> datasets) {
- if (dataverse != null) {
- dataverses.add(dataverse.getDataverseName());
- }
- Collections.sort(dataverses);
- Collections.sort(datasets);
-
- String previous = null;
- for (int i = 0; i < dataverses.size(); i++) {
- String current = dataverses.get(i);
- if (!current.equals(previous)) {
- acquireDataverseReadLock(current);
- previous = current;
- }
- }
-
- for (int i = 0; i < datasets.size(); i++) {
- String current = datasets.get(i);
- if (!current.equals(previous)) {
- acquireDatasetReadLock(current);
- previous = current;
- }
- }
- }
-
- public void queryEnd(List<String> dataverses, List<String> datasets) {
- String previous = null;
- for (int i = dataverses.size() - 1; i >= 0; i--) {
- String current = dataverses.get(i);
- if (!current.equals(previous)) {
- releaseDataverseReadLock(current);
- previous = current;
- }
- }
- for (int i = datasets.size() - 1; i >= 0; i--) {
- String current = datasets.get(i);
- if (!current.equals(previous)) {
- releaseDatasetReadLock(current);
- previous = current;
- }
- }
- }
-
- public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
- acquireDataverseReadLock(dataverseName);
- acquireExternalDatasetRefreshLock(datasetFullyQualifiedName);
- }
-
- public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
- releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
- releaseDataverseReadLock(dataverseName);
- }
-
- public void acquireExtensionReadLock(String entityName) {
- ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
- entityLock.readLock().lock();
- }
-
- public void releaseExtensionReadLock(String entityName) {
- extensionLocks.get(entityName).readLock().unlock();
- }
-
- public void acquireExtensionWriteLock(String entityName) {
- ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
- entityLock.writeLock().lock();
- }
-
- public void releaseExtensionWriteLock(String entityName) {
- extensionLocks.get(entityName).writeLock().unlock();
- }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
index 8859b9d..45034de 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
@@ -32,6 +32,7 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 35e7acb..a4b849f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -48,19 +48,19 @@
ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons();
String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
for (int j = 0; j < clusterPartition.length; j++) {
- File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- clusterPartition[j].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
+ File f = new File(
+ StoragePathUtil.prepareStoragePartitionPath(storageDirName, clusterPartition[j].getPartitionId())
+ + File.separator + relPathFile);
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
}
return splits.toArray(new FileSplit[] {});
}
- public static FileSplit[] getDatasetSplits(MetadataTransactionContext mdTxnCtx, String dataverseName,
- String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+ public static FileSplit[] getDatasetSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx,
+ String targetIdxName, boolean temp) throws AlgebricksException {
try {
- File relPathFile =
- new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+ dataset.getDatasetName(), targetIdxName));
List<String> nodeGroup =
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
if (nodeGroup == null) {
@@ -92,12 +92,11 @@
}
}
- private static FileSplit[] getFilesIndexSplits(MetadataTransactionContext mdTxnCtx, String dataverseName,
- String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+ private static FileSplit[] getFilesIndexSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx,
+ String targetIdxName, boolean create) throws AlgebricksException {
try {
- File relPathFile =
- new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+ dataset.getDatasetName(), targetIdxName));
List<String> nodeGroup =
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
if (nodeGroup == null) {
@@ -114,14 +113,14 @@
// Only the first partition when create
File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
- .getPath()));
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+ f.getPath()));
} else {
for (int k = 0; k < nodePartitions.length; k++) {
File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
- .getPath()));
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+ f.getPath()));
}
}
}
@@ -138,9 +137,9 @@
}
public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getFilesIndexSplitProviderAndConstraints(
- MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
- boolean create) throws AlgebricksException {
- FileSplit[] splits = getFilesIndexSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+ Dataset dataset, MetadataTransactionContext mdTxnCtx, String targetIdxName, boolean create)
+ throws AlgebricksException {
+ FileSplit[] splits = getFilesIndexSplits(dataset, mdTxnCtx, targetIdxName, create);
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}