[ASTERIXDB-3189][*DB] Allow queries to be canceled during compilation
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Allow queries to be canceled during compilation.
- Allow queries to be interrupted while waiting for
dataset upgrade lock which could potentially be
held by the rebalance.
- Log before and after rebalance acquires dataset
upgrade lock.
Change-Id: I6031f36df583ed790a0ec89885071c27ae8efdb9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17543
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1729b50..298468c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4176,9 +4176,9 @@
}
private interface IMetadataLocker {
- void lock() throws AlgebricksException;
+ void lock() throws HyracksDataException, AlgebricksException, InterruptedException;
- void unlock() throws AlgebricksException;
+ void unlock() throws HyracksDataException, AlgebricksException;
}
private interface IResultPrinter {
@@ -4193,10 +4193,19 @@
IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
throws Exception {
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ final ClientRequest clientRequest =
+ (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
final IMetadataLocker locker = new IMetadataLocker() {
@Override
- public void lock() {
- compilationLock.readLock().lock();
+ public void lock() throws RuntimeDataException, InterruptedException {
+ try {
+ compilationLock.readLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ ensureNotCancelled(clientRequest);
+ throw e;
+ }
}
@Override
@@ -4343,18 +4352,20 @@
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+ if (cancellable) {
+ clientRequest.markCancellable();
+ }
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return;
}
- if (cancellable) {
- clientRequest.markCancellable();
- }
final SchedulableClientRequest schedulableRequest =
SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
+ // ensure request not cancelled before running job
+ ensureNotCancelled(clientRequest);
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
if (jId != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index b0dc162..e2d8e01 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -243,8 +243,10 @@
ActiveNotificationHandler activeNotificationHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+ LOGGER.debug("attempting to acquire dataset {} upgrade lock", source.getDatasetName());
lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), source.getDataverseName(),
source.getDatasetName());
+ LOGGER.debug("acquired dataset {} upgrade lock", source.getDatasetName());
LOGGER.info("Updating dataset {} node group from {} to {}", source.getDatasetName(), source.getNodeGroupName(),
target.getNodeGroupName());
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
index 1f77aa0..d491ea4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -63,7 +63,7 @@
* @param mode
* lock mode
*/
- void lock(IMetadataLock.Mode mode);
+ void lock(IMetadataLock.Mode mode) throws InterruptedException;
/**
* Release a lock
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
index 43a1849..06a317e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -49,7 +49,12 @@
if (isContained(mode, lock)) {
return;
}
- lock.lock(mode);
+ try {
+ lock.lock(mode);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AsterixException(e);
+ }
indexes.put(lock.getKey(), locks.size());
locks.add(MutablePair.of(lock, mode));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index e0a6725..41d0e97 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -79,8 +79,8 @@
lock.writeLock().unlock();
}
- private void upgradeReadLock() {
- upgradeLock.readLock().lock();
+ private void upgradeReadLock() throws InterruptedException {
+ upgradeLock.readLock().lockInterruptibly();
}
private void modifyReadLock() {
@@ -185,7 +185,7 @@
}
@Override
- public void lock(IMetadataLock.Mode mode) {
+ public void lock(IMetadataLock.Mode mode) throws InterruptedException {
switch (mode) {
case INDEX_BUILD:
readLock();
@@ -203,8 +203,7 @@
writeLock();
break;
case READ:
- readLock();
- upgradeReadLock();
+ atomicReadLock();
break;
default:
throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -264,6 +263,17 @@
return Objects.equals(key, ((DatasetLock) o).key);
}
+ private void atomicReadLock() throws InterruptedException {
+ readLock();
+ try {
+ upgradeReadLock();
+ } catch (InterruptedException e) {
+ readUnlock();
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
@Override
public String toString() {
return String.valueOf(key);