Add Active Partition Event Message
Enable active runtimes to send messages to the listener. In addition
this change introduces extension locks to metadata lock manager.
Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1596
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index e4a57e6..3c7aa06 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -31,6 +31,7 @@
public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
+ public static final byte GENERIC_EVENT = 0x02;
private static final long serialVersionUID = 1L;
private final ActiveRuntimeId activeRuntimeId;
private final JobId jobId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e64cf14..bd5c214 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -208,9 +208,8 @@
protected final IStorageComponentProvider componentProvider;
protected final ExecutorService executorService;
- public QueryTranslator(List<Statement> statements, SessionConfig conf,
- ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
- ExecutorService executorService) {
+ public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider,
+ IStorageComponentProvider componentProvider, ExecutorService executorService) {
this.statements = statements;
this.sessionConfig = conf;
this.componentProvider = componentProvider;
@@ -457,8 +456,9 @@
}
}
- protected void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
- MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws CompilationException, Exception {
+ protected static void validateCompactionPolicy(String compactionPolicy,
+ Map<String, String> compactionPolicyProperties, MetadataTransactionContext mdTxnCtx,
+ boolean isExternalDataset) throws CompilationException, Exception {
CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
if (compactionPolicyEntity == null) {
@@ -534,8 +534,8 @@
if (dt == null) {
throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
}
- String ngName =
- ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
+ String ngName = ngNameId != null ? ngNameId.getValue()
+ : configureNodegroupForDataset(dataverseName, datasetName, dd.getHints(), mdTxnCtx);
if (compactionPolicy == null) {
compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -585,8 +585,7 @@
break;
case EXTERNAL:
String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
- Map<String, String> properties =
- ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
datasetDetails =
new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT);
@@ -711,22 +710,22 @@
}
}
- protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
- MetadataTransactionContext mdTxnCtx) throws CompilationException {
+ protected static String configureNodegroupForDataset(String dataverseName, String datasetName,
+ Map<String, String> hints, MetadataTransactionContext mdTxnCtx) throws CompilationException {
int nodegroupCardinality;
String nodegroupName;
- String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+ String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
if (hintValue == null) {
nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
return nodegroupName;
} else {
int numChosen = 0;
boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
- dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+ hints.get(DatasetNodegroupCardinalityHint.NAME)).first;
if (!valid) {
throw new CompilationException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
} else {
- nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
+ nodegroupCardinality = Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
}
List<String> nodeNames = AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames();
List<String> nodeNamesClone = new ArrayList<>(nodeNames);
@@ -753,7 +752,7 @@
b[selected] = temp;
}
}
- nodegroupName = dataverse + ":" + dd.getName().getValue();
+ nodegroupName = dataverseName + ":" + datasetName;
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
return nodegroupName;
}
@@ -921,11 +920,10 @@
// Get snapshot from External File System
externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
// Add an entry for the files index
- filesIndex =
- new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName),
- IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
- ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
- MetadataUtil.PENDING_ADD_OP);
+ filesIndex = new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName),
+ IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+ ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
+ MetadataUtil.PENDING_ADD_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
// Add files to the external files index
for (ExternalFile file : externalFilesSnapshot) {
@@ -960,8 +958,8 @@
// #. add a new index with PendingAddOp
index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
- keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(),
- stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
+ keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
+ false, MetadataUtil.PENDING_ADD_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
ARecordType enforcedType = null;
@@ -1013,8 +1011,8 @@
// add another new files index with PendingNoOp after deleting the index with
// PendingAddOp
if (firstExternalDatasetIndex) {
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, filesIndex.getIndexName());
+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+ filesIndex.getIndexName());
filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
// update transaction timestamp
@@ -1186,8 +1184,8 @@
stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())),
metadataProvider);
// prepare job to remove feed log storage
- jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE
- .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
+ jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
}
}
@@ -1201,8 +1199,8 @@
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int k = 0; k < indexes.size(); k++) {
if (indexes.get(k).isSecondaryIndex()) {
- jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
- metadataProvider, datasets.get(j)));
+ jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider,
+ datasets.get(j)));
}
}
Index primaryIndex =
@@ -1217,8 +1215,8 @@
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
datasets.get(j)));
} else {
- jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
- metadataProvider, datasets.get(j)));
+ jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider,
+ datasets.get(j)));
}
}
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
@@ -1311,6 +1309,11 @@
DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
+ doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+ }
+
+ public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
+ boolean ifExists, IHyracksClientConnection hcc) throws Exception {
MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
MutableObject<MetadataTransactionContext> mdTxnCtx =
new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -1321,12 +1324,12 @@
try {
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
if (ds == null) {
- if (stmtDelete.getIfExists()) {
+ if (ifExists) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
return;
} else {
- throw new AlgebricksException("There is no dataset with this name " + datasetName
- + " in dataverse " + dataverseName + ".");
+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+ + dataverseName + ".");
}
}
ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc);
@@ -1362,7 +1365,6 @@
+ "." + datasetName + ") couldn't be removed from the metadata", e);
}
}
-
throw e;
} finally {
MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
@@ -1370,104 +1372,6 @@
}
}
- protected void doDropDataset(Dataset ds, String datasetName, MetadataProvider metadataProvider,
- MutableObject<MetadataTransactionContext> mdTxnCtx, List<JobSpecification> jobsToExecute,
- String dataverseName, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress,
- IHyracksClientConnection hcc) throws Exception {
- Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
- if (ds.getDatasetType() == DatasetType.INTERNAL) {
- // prepare job spec(s) that would disconnect any active feeds involving the dataset.
- IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
- for (IActiveEntityEventsListener listener : activeListeners) {
- if (listener.isEntityUsingDataset(ds)) {
- throw new CompilationException(
- "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
- }
- }
-
- // #. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes =
- MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
- if (indexes.get(j).isSecondaryIndex()) {
- jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds));
- }
- }
- Index primaryIndex =
- MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), dataverseName, datasetName, datasetName);
- jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, metadataProvider));
- // #. mark the existing dataset as PendingDropOp
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
- MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
- new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
- ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(),
- ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
- ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP));
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
- bActiveTxn.setValue(false);
- progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
- // # disconnect the feeds
- for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
- JobUtils.runJob(hcc, p.first, true);
- }
-
- // #. run the jobs
- for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
- }
-
- mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
- bActiveTxn.setValue(true);
- metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
- } else {
- // External dataset
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- // #. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes =
- MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
- if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
- jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds));
- } else {
- jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds));
- }
- }
-
- // #. mark the existing dataset as PendingDropOp
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
- MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
- new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
- ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(),
- ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
- MetadataUtil.PENDING_DROP_OP));
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
- bActiveTxn.setValue(false);
- progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
- // #. run the jobs
- for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
- }
- if (!indexes.isEmpty()) {
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- }
- mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
- bActiveTxn.setValue(true);
- metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
- }
-
- // #. finally, delete the dataset.
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
- // Drop the associated nodegroup
- String nodegroup = ds.getNodeGroupName();
- if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName);
- }
- }
-
protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
@@ -1524,10 +1428,9 @@
// #. mark PendingDropOp on the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
- new Index(dataverseName, datasetName, indexName, index.getIndexType(),
- index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(),
- index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
- MetadataUtil.PENDING_DROP_OP));
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+ index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
+ index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
// #. commit the existing transaction before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1586,10 +1489,9 @@
// #. mark PendingDropOp on the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
- new Index(dataverseName, datasetName, indexName, index.getIndexType(),
- index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(),
- index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
- MetadataUtil.PENDING_DROP_OP));
+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+ index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
+ index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
// #. commit the existing transaction before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1648,8 +1550,8 @@
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
- + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+ throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+ + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
}
}
@@ -1783,8 +1685,7 @@
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
- JobSpecification spec =
- apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
+ JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
@@ -1989,8 +1890,8 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
- .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
+ FeedPolicyEntity feedPolicy =
+ MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
if (feedPolicy != null) {
if (cfps.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2002,8 +1903,8 @@
boolean extendingExisting = cfps.getSourcePolicyName() != null;
String description = cfps.getDescription() == null ? "" : cfps.getDescription();
if (extendingExisting) {
- FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
- metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+ FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
+ .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
if (sourceFeedPolicy == null) {
sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
@@ -2135,8 +2036,7 @@
}
// Start
try {
- MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName,
- feedConnections);
+ MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, feedConnections);
// Prepare policy
List<IDataset> datasets = new ArrayList<>();
for (FeedConnection connection : feedConnections) {
@@ -2761,8 +2661,8 @@
handlePregelixStatement(metadataProvider, runStmt, hcc);
break;
default:
- throw new AlgebricksException("The system \"" + runStmt.getSystem()
- + "\" specified in your run statement is not supported.");
+ throw new AlgebricksException(
+ "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 55de9c7..a73a5cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -302,7 +302,7 @@
private IStatementExecutorFactory getStatementExecutorFactory() {
return ccExtensionManager.getStatementExecutorFactory(
- ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutorService());
+ ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ad75b6b..cd8cf3b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -609,4 +609,8 @@
public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
return NoOpFrameOperationCallbackFactory.INSTANCE;
}
+
+ public boolean isTemp() {
+ return getDatasetDetails().isTemp();
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 81eaa9b..9292008 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -22,14 +22,16 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
public class MetadataLockManager {
public static final MetadataLockManager INSTANCE = new MetadataLockManager();
+ private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key -> new ReentrantReadWriteLock();
+ private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
@@ -38,6 +40,7 @@
private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks;
private MetadataLockManager() {
dataversesLocks = new ConcurrentHashMap<>();
@@ -48,14 +51,11 @@
feedPolicyLocks = new ConcurrentHashMap<>();
compactionPolicyLocks = new ConcurrentHashMap<>();
dataTypeLocks = new ConcurrentHashMap<>();
+ extensionLocks = new ConcurrentHashMap<>();
}
public void acquireDataverseReadLock(String dataverseName) {
- ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
- if (dvLock == null) {
- dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
- dvLock = dataversesLocks.get(dataverseName);
- }
+ ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
dvLock.readLock().lock();
}
@@ -64,11 +64,7 @@
}
public void acquireDataverseWriteLock(String dataverseName) {
- ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
- if (dvLock == null) {
- dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
- dvLock = dataversesLocks.get(dataverseName);
- }
+ ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
dvLock.writeLock().lock();
}
@@ -77,11 +73,7 @@
}
public void acquireDatasetReadLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- if (dsLock == null) {
- datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
- dsLock = datasetsLocks.get(datasetName);
- }
+ DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
dsLock.acquireReadLock();
}
@@ -90,11 +82,7 @@
}
public void acquireDatasetWriteLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- if (dsLock == null) {
- datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
- dsLock = datasetsLocks.get(datasetName);
- }
+ DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
dsLock.acquireWriteLock();
}
@@ -103,11 +91,7 @@
}
public void acquireDatasetModifyLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- if (dsLock == null) {
- datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
- dsLock = datasetsLocks.get(datasetName);
- }
+ DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
dsLock.acquireReadLock();
dsLock.acquireReadModifyLock();
}
@@ -119,11 +103,7 @@
}
public void acquireDatasetCreateIndexLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- if (dsLock == null) {
- datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
- dsLock = datasetsLocks.get(datasetName);
- }
+ DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
dsLock.acquireReadLock();
dsLock.acquireWriteModifyLock();
}
@@ -135,11 +115,7 @@
}
public void acquireExternalDatasetRefreshLock(String datasetName) {
- DatasetLock dsLock = datasetsLocks.get(datasetName);
- if (dsLock == null) {
- datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
- dsLock = datasetsLocks.get(datasetName);
- }
+ DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
dsLock.acquireReadLock();
dsLock.acquireRefreshLock();
}
@@ -151,11 +127,7 @@
}
public void acquireFunctionReadLock(String functionName) {
- ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
- if (fLock == null) {
- functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
- fLock = functionsLocks.get(functionName);
- }
+ ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
fLock.readLock().lock();
}
@@ -164,11 +136,7 @@
}
public void acquireFunctionWriteLock(String functionName) {
- ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
- if (fLock == null) {
- functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
- fLock = functionsLocks.get(functionName);
- }
+ ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
fLock.writeLock().lock();
}
@@ -177,11 +145,7 @@
}
public void acquireNodeGroupReadLock(String nodeGroupName) {
- ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
- if (ngLock == null) {
- nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
- ngLock = nodeGroupsLocks.get(nodeGroupName);
- }
+ ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
ngLock.readLock().lock();
}
@@ -190,11 +154,7 @@
}
public void acquireNodeGroupWriteLock(String nodeGroupName) {
- ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
- if (ngLock == null) {
- nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
- ngLock = nodeGroupsLocks.get(nodeGroupName);
- }
+ ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
ngLock.writeLock().lock();
}
@@ -203,11 +163,7 @@
}
public void acquireFeedReadLock(String feedName) {
- ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
- if (fLock == null) {
- feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
- fLock = feedsLocks.get(feedName);
- }
+ ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
fLock.readLock().lock();
}
@@ -216,11 +172,7 @@
}
public void acquireFeedWriteLock(String feedName) {
- ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
- if (fLock == null) {
- feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
- fLock = feedsLocks.get(feedName);
- }
+ ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
fLock.writeLock().lock();
}
@@ -229,11 +181,7 @@
}
public void acquireFeedPolicyWriteLock(String policyName) {
- ReentrantReadWriteLock fLock = feedPolicyLocks.get(policyName);
- if (fLock == null) {
- feedPolicyLocks.putIfAbsent(policyName, new ReentrantReadWriteLock());
- fLock = feedPolicyLocks.get(policyName);
- }
+ ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION);
fLock.writeLock().lock();
}
@@ -242,11 +190,8 @@
}
public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
- ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
- if (compactionPolicyLock == null) {
- compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
- compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
- }
+ ReentrantReadWriteLock compactionPolicyLock =
+ compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
compactionPolicyLock.readLock().lock();
}
@@ -255,11 +200,8 @@
}
public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
- ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
- if (compactionPolicyLock == null) {
- compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
- compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
- }
+ ReentrantReadWriteLock compactionPolicyLock =
+ compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
compactionPolicyLock.writeLock().lock();
}
@@ -268,11 +210,7 @@
}
public void acquireDataTypeReadLock(String dataTypeName) {
- ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
- if (dataTypeLock == null) {
- dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
- dataTypeLock = dataTypeLocks.get(dataTypeName);
- }
+ ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
dataTypeLock.readLock().lock();
}
@@ -281,11 +219,7 @@
}
public void acquireDataTypeWriteLock(String dataTypeName) {
- ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
- if (dataTypeLock == null) {
- dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
- dataTypeLock = dataTypeLocks.get(dataTypeName);
- }
+ ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
dataTypeLock.writeLock().lock();
}
@@ -410,8 +344,8 @@
releaseDataverseReadLock(dataverseName);
}
- public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName,
- List<String> dataverses, List<String> datasets) {
+ public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+ List<String> datasets) {
dataverses.add(dataverseName);
datasets.add(datasetFullyQualifiedName);
Collections.sort(dataverses);
@@ -632,4 +566,22 @@
releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
+
+ public void acquireExtensionReadLock(String entityName) {
+ ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
+ entityLock.readLock().lock();
+ }
+
+ public void releaseExtensionReadLock(String entityName) {
+ extensionLocks.get(entityName).readLock().unlock();
+ }
+
+ public void acquireExtensionWriteLock(String entityName) {
+ ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
+ entityLock.writeLock().lock();
+ }
+
+ public void releaseExtensionWriteLock(String entityName) {
+ extensionLocks.get(entityName).writeLock().unlock();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index aecc643..540de3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.service;
+import java.util.concurrent.ExecutorService;
+
import org.apache.hyracks.api.application.IServiceContext;
public interface IControllerService {
@@ -27,5 +29,7 @@
IServiceContext getContext();
+ ExecutorService getExecutor();
+
Object getApplicationContext();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 0835551..c47a612 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,7 +34,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
@@ -325,14 +324,11 @@
return workQueue;
}
- public ExecutorService getExecutorService() {
+ @Override
+ public ExecutorService getExecutor() {
return executor;
}
- public Executor getExecutor() {
- return getExecutorService();
- }
-
public CCConfig getConfig() {
return ccConfig;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 46aa992..d7b4be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -217,7 +217,7 @@
LOGGER.warning("Freeing leaked " + stillAllocated + " bytes");
serviceCtx.getMemoryManager().deallocate(stillAllocated);
}
- nodeController.getExecutorService().execute(new Runnable() {
+ nodeController.getExecutor().execute(new Runnable() {
@Override
public void run() {
deallocatableRegistry.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e6278be..c416942 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -118,12 +118,12 @@
case SHUTDOWN_REQUEST:
final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
- ncs.getExecutorService().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
+ ncs.getExecutor().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
return;
case THREAD_DUMP_REQUEST:
final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
- ncs.getExecutorService().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+ ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
return;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 14d221e..9c28c16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -416,7 +416,8 @@
return nodeParameters;
}
- public ExecutorService getExecutorService() {
+ @Override
+ public ExecutorService getExecutor() {
return executor;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 8f68e76..670ce06 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -53,7 +53,7 @@
}
final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
- ncs.getExecutorService().execute(new Runnable() {
+ ncs.getExecutor().execute(new Runnable() {
@Override
public void run() {
for (IPartition p : unregisteredPartitions) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 692eac6..02e8051 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -131,7 +131,7 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
- task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
+ task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
@@ -207,7 +207,7 @@
.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
- .getTaskAttemptId(), ncs.getExecutorService());
+ .getTaskAttemptId(), ncs.getExecutor());
} else {
return collector;
}
@@ -223,7 +223,7 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
+ conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
}
};
} else {
@@ -233,7 +233,7 @@
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
- .getExecutorService());
+ .getExecutor());
}
};
}