[ASTERIXDB-3189][*DB] Allow queries to be canceled during compilation

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Allow queries to be canceled during compilation.
- Allow queries to be interrupted while waiting for
  dataset upgrade lock which could potentially be
  held by the rebalance.
- Log before and after rebalance acquires dataset
  upgrade lock.

Change-Id: I6031f36df583ed790a0ec89885071c27ae8efdb9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17543
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1729b50..298468c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4176,9 +4176,9 @@
     }
 
     private interface IMetadataLocker {
-        void lock() throws AlgebricksException;
+        void lock() throws HyracksDataException, AlgebricksException, InterruptedException;
 
-        void unlock() throws AlgebricksException;
+        void unlock() throws HyracksDataException, AlgebricksException;
     }
 
     private interface IResultPrinter {
@@ -4193,10 +4193,19 @@
             IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
             IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
             throws Exception {
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        final ClientRequest clientRequest =
+                (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
-            public void lock() {
-                compilationLock.readLock().lock();
+            public void lock() throws RuntimeDataException, InterruptedException {
+                try {
+                    compilationLock.readLock().lockInterruptibly();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    ensureNotCancelled(clientRequest);
+                    throw e;
+                }
             }
 
             @Override
@@ -4343,18 +4352,20 @@
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+        if (cancellable) {
+            clientRequest.markCancellable();
+        }
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            if (cancellable) {
-                clientRequest.markCancellable();
-            }
             final SchedulableClientRequest schedulableRequest =
                     SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
+            // ensure request not cancelled before running job
+            ensureNotCancelled(clientRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index b0dc162..e2d8e01 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -243,8 +243,10 @@
         ActiveNotificationHandler activeNotificationHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+        LOGGER.debug("attempting to acquire dataset {} upgrade lock", source.getDatasetName());
         lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), source.getDataverseName(),
                 source.getDatasetName());
+        LOGGER.debug("acquired dataset {} upgrade lock", source.getDatasetName());
         LOGGER.info("Updating dataset {} node group from {} to {}", source.getDatasetName(), source.getNodeGroupName(),
                 target.getNodeGroupName());
         try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
index 1f77aa0..d491ea4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -63,7 +63,7 @@
      * @param mode
      *            lock mode
      */
-    void lock(IMetadataLock.Mode mode);
+    void lock(IMetadataLock.Mode mode) throws InterruptedException;
 
     /**
      * Release a lock
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
index 43a1849..06a317e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -49,7 +49,12 @@
         if (isContained(mode, lock)) {
             return;
         }
-        lock.lock(mode);
+        try {
+            lock.lock(mode);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AsterixException(e);
+        }
         indexes.put(lock.getKey(), locks.size());
         locks.add(MutablePair.of(lock, mode));
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index e0a6725..41d0e97 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -79,8 +79,8 @@
         lock.writeLock().unlock();
     }
 
-    private void upgradeReadLock() {
-        upgradeLock.readLock().lock();
+    private void upgradeReadLock() throws InterruptedException {
+        upgradeLock.readLock().lockInterruptibly();
     }
 
     private void modifyReadLock() {
@@ -185,7 +185,7 @@
     }
 
     @Override
-    public void lock(IMetadataLock.Mode mode) {
+    public void lock(IMetadataLock.Mode mode) throws InterruptedException {
         switch (mode) {
             case INDEX_BUILD:
                 readLock();
@@ -203,8 +203,7 @@
                 writeLock();
                 break;
             case READ:
-                readLock();
-                upgradeReadLock();
+                atomicReadLock();
                 break;
             default:
                 throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -264,6 +263,17 @@
         return Objects.equals(key, ((DatasetLock) o).key);
     }
 
+    private void atomicReadLock() throws InterruptedException {
+        readLock();
+        try {
+            upgradeReadLock();
+        } catch (InterruptedException e) {
+            readUnlock();
+            Thread.currentThread().interrupt();
+            throw e;
+        }
+    }
+
     @Override
     public String toString() {
         return String.valueOf(key);