[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup

- user model changes: no
- storage format changes: no
- interface changes: yes
  --IJobManager.cancel now takes a callback

details:
- Job cancellation now completes only after the job cleanup work
  has completed and not merely when the abort tasks are executed.
- The NCQueryServiceServlet actively cancels requests that passes
  5 minutes.
- Cancellation of timedout jobs is not done through the Http API
  but through message broker.
- Typically, requests might timeout when the servers are
  overloaded. When that is the case, there is a high chance http
  requests are to be rejected including requests to cancel
  previously submitted queries. This is the reason for using
  Message broker for this task.
- ExecuteStatementRequest used to execute the statement in
  a different executor thread even though it is itself is being
  executed in an executor thread and is not blocking anyone.
  This was fixed as well.

Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1961
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a4e72f7..c38b3fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.io.PrintWriter;
 import java.util.UUID;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..58c282f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.awt.image.BufferedImage;
 import java.io.BufferedReader;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index b24a9a1..4faab1e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 52d4d67..6dea30c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index d9a63f7..52af643 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index dcd0e70..9d2415d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -34,7 +34,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
new file mode 100644
index 0000000..bdda750
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public enum Duration {
+    SEC("s", 9),
+    MILLI("ms", 6),
+    MICRO("µs", 3),
+    NANO("ns", 0);
+
+    static final long NANOSECONDS = 1;
+    static final long MICROSECONDS = 1000 * NANOSECONDS;
+    static final long MILLISECONDS = 1000 * MICROSECONDS;
+    static final long SECONDS = 1000 * MILLISECONDS;
+    static final long MINUTES = 60 * SECONDS;
+    static final long HOURS = 60 * MINUTES;
+
+    String unit;
+    int nanoDigits;
+
+    Duration(String unit, int nanoDigits) {
+        this.unit = unit;
+        this.nanoDigits = nanoDigits;
+    }
+
+    public static String formatNanos(long nanoTime) {
+        final String strTime = String.valueOf(nanoTime);
+        final int len = strTime.length();
+        for (Duration tu : Duration.values()) {
+            if (len > tu.nanoDigits) {
+                final String integer = strTime.substring(0, len - tu.nanoDigits);
+                final String fractional = strTime.substring(len - tu.nanoDigits);
+                return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit;
+            }
+        }
+        return "illegal string value: " + strTime;
+    }
+
+    // ParseDuration parses a duration string.
+    // A duration string is a possibly signed sequence of
+    // decimal numbers, each with optional fraction and a unit suffix,
+    // such as "300ms", "-1.5h" or "2h45m".
+    // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+    // returns the duration in nano seconds
+    public static long parseDurationStringToNanos(String orig) throws HyracksDataException {
+        // [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
+        String s = orig;
+        long d = 0;
+        boolean neg = false;
+        char c;
+        // Consume [-+]?
+        if (!s.isEmpty()) {
+            c = s.charAt(0);
+            if (c == '-' || c == '+') {
+                neg = c == '-';
+                s = s.substring(1);
+            }
+        }
+
+        // Special case: if all that is left is "0", this is zero.
+        if ("0".equals(s)) {
+            return 0L;
+        }
+
+        if (s.isEmpty()) {
+            throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+        }
+
+        while (!s.isEmpty()) {
+            long v = 0L; // integers before decimal
+            long f = 0L; // integers after decimal
+            double scale = 1.0; // value = v + f/scale
+            // The next character must be [0-9.]
+            if (!(s.charAt(0) == '.' || '0' <= s.charAt(0) && s.charAt(0) <= '9')) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+            }
+            // Consume [0-9]*
+            int pl = s.length();
+            Pair<Long, String> pair = leadingInt(s);
+            v = pair.getLeft();
+            s = pair.getRight();
+            boolean pre = pl != s.length(); // whether we consumed anything before a period
+
+            // Consume (\.[0-9]*)?
+            boolean post = false;
+            if (!s.isEmpty() && s.charAt(0) == '.') {
+                s = s.substring(1);
+                pl = s.length();
+                Triple<Long, Double, String> triple = leadingFraction(s);
+                f = triple.getLeft();
+                scale = triple.getMiddle();
+                s = triple.getRight();
+                post = pl != s.length();
+            }
+            if (!pre && !post) {
+                // no digits (e.g. ".s" or "-.s")
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+            }
+
+            // Consume unit.
+            int i = 0;
+            for (; i < s.length(); i++) {
+                c = s.charAt(i);
+                if (c == '.' || '0' <= c && c <= '9') {
+                    break;
+                }
+            }
+            if (i == 0) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+            }
+            String u = s.substring(0, i);
+            s = s.substring(i);
+            long unit = getUnit(u);
+            if (v > Long.MAX_VALUE / unit) {
+                // overflow
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+            }
+            v *= unit;
+            if (f > 0) {
+                // float64 is needed to be nanosecond accurate for fractions of hours.
+                // v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit)
+                v += (long) (((double) f * (double) unit) / scale);
+                if (v < 0) {
+                    // overflow
+                    throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+                }
+            }
+            d += v;
+            if (d < 0) {
+                // overflow
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+            }
+        }
+
+        if (neg) {
+            d = -d;
+        }
+        return d;
+    }
+
+    private static final long getUnit(String unit) throws HyracksDataException {
+        switch (unit) {
+            case "ns":
+                return NANOSECONDS;
+            case "us":
+            case "µs":// U+00B5 = micro symbol
+            case "μs":// U+03BC = Greek letter mu
+                return MICROSECONDS;
+            case "ms":
+                return MILLISECONDS;
+            case "s":
+                return SECONDS;
+            case "m":
+                return MINUTES;
+            case "h":
+                return HOURS;
+            default:
+                throw new RuntimeDataException(ErrorCode.UNKNOWN_DURATION_UNIT, unit);
+        }
+    }
+
+    // leadingInt consumes the leading [0-9]* from s.
+    static Pair<Long, String> leadingInt(String origin) throws HyracksDataException {
+        String s = origin;
+        long x = 0L;
+        int i = 0;
+        for (; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c < '0' || c > '9') {
+                break;
+            }
+            if (x > Long.MAX_VALUE / 10) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin);
+            }
+            x = x * 10 + Character.getNumericValue(c);
+            if (x < 0) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin);
+            }
+        }
+        return Pair.of(x, s.substring(i));
+    }
+
+    // leadingFraction consumes the leading [0-9]* from s.
+    // It is used only for fractions, so does not return an error on overflow,
+    // it just stops accumulating precision.
+    static Triple<Long, Double, String> leadingFraction(String s) {
+        int i = 0;
+        long x = 0L;
+        double scale = 1.0;
+        boolean overflow = false;
+        for (; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c < '0' || c > '9') {
+                break;
+            }
+            if (overflow) {
+                continue;
+            }
+            if (x > (1 << 63 - 1) / 10) {
+                // It's possible for overflow to give a positive number, so take care.
+                overflow = true;
+                continue;
+            }
+            long y = x * 10 + Character.getNumericValue(c);
+            if (y < 0) {
+                overflow = true;
+                continue;
+            }
+            x = y;
+            scale *= 10;
+        }
+        return Triple.of(x, scale, s.substring(i));
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 9547514..ef49c35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -19,17 +19,20 @@
 
 package org.apache.asterix.api.http.server;
 
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.CancelQueryRequest;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
 import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
@@ -41,11 +44,14 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 /**
  * Query service servlet that can run on NC nodes.
  * Delegates query execution to CC, then serves the result.
  */
 public class NCQueryServiceServlet extends QueryServiceServlet {
+
     public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
             ILangExtension.Language queryLanguage) {
         super(ctx, paths, appCtx, queryLanguage, null, null, null);
@@ -63,13 +69,28 @@
         ExecuteStatementResponseMessage responseMsg;
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         try {
+            if (param.clientContextID == null) {
+                param.clientContextID = UUID.randomUUID().toString();
+            }
+            long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+            if (param.timeout != null) {
+                timeout = java.util.concurrent.TimeUnit.NANOSECONDS
+                        .toMillis(Duration.parseDurationStringToNanos(param.timeout));
+            }
             ExecuteStatementRequestMessage requestMsg =
                     new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
                             statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
-            responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
-                    ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS);
+            try {
+                responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout,
+                        java.util.concurrent.TimeUnit.MILLISECONDS);
+            } catch (TimeoutException exception) {
+                RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception);
+                // cancel query
+                cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, hde);
+                throw hde;
+            }
             outExecStartEnd[1] = System.nanoTime();
         } finally {
             ncMb.deregisterMessageFuture(responseFuture.getFutureId());
@@ -97,6 +118,22 @@
         }
     }
 
+    private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID,
+            Exception exception) {
+        MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture();
+        try {
+            CancelQueryRequest cancelQueryMessage =
+                    new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
+            messageBroker.sendMessageToCC(cancelQueryMessage);
+            cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
+                    java.util.concurrent.TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            exception.addSuppressed(e);
+        } finally {
+            messageBroker.deregisterMessageFuture(cancelQueryFuture.getFutureId());
+        }
+    }
+
     @Override
     protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
         if (t instanceof IPCException || t instanceof TimeoutException) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index d9757c7..6291869 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
index bfec146..3f07151 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
@@ -23,7 +23,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1f1d282..c630636 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,7 +27,6 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.algebra.base.ILangExtension;
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -108,7 +107,8 @@
         FORMAT("format"),
         CLIENT_ID("client_context_id"),
         PRETTY("pretty"),
-        MODE("mode");
+        MODE("mode"),
+        TIMEOUT("timeout");
 
         private final String str;
 
@@ -154,39 +154,12 @@
         }
     }
 
-    public enum TimeUnit {
-        SEC("s", 9),
-        MILLI("ms", 6),
-        MICRO("µs", 3),
-        NANO("ns", 0);
-
-        String unit;
-        int nanoDigits;
-
-        TimeUnit(String unit, int nanoDigits) {
-            this.unit = unit;
-            this.nanoDigits = nanoDigits;
-        }
-
-        public static String formatNanos(long nanoTime) {
-            final String strTime = String.valueOf(nanoTime);
-            final int len = strTime.length();
-            for (TimeUnit tu : TimeUnit.values()) {
-                if (len > tu.nanoDigits) {
-                    final String integer = strTime.substring(0, len - tu.nanoDigits);
-                    final String fractional = strTime.substring(len - tu.nanoDigits);
-                    return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit;
-                }
-            }
-            return "illegal string value: " + strTime;
-        }
-    }
-
     static class RequestParameters {
         String host;
         String path;
         String statement;
         String format;
+        String timeout;
         boolean pretty;
         String clientContextID;
         String mode;
@@ -202,6 +175,7 @@
                 on.put("pretty", pretty);
                 on.put("mode", mode);
                 on.put("clientContextID", clientContextID);
+                on.put("format", format);
                 return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on);
             } catch (JsonProcessingException e) { // NOSONAR
                 return e.getMessage();
@@ -297,9 +271,9 @@
         pw.print(ResultFields.METRICS.str());
         pw.print("\": {\n");
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime));
+        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(elapsedTime));
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime));
+        ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), Duration.formatNanos(executionTime));
         pw.print("\t");
         ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
         pw.print("\t");
@@ -334,6 +308,7 @@
                 param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false);
                 param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str()));
                 param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str());
+                param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str());
             } catch (JsonParseException | JsonMappingException e) {
                 // if the JSON parsing fails, the statement is empty and we get an empty statement error
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 27e2806..8536571 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.PrintWriter;
 import java.util.ArrayDeque;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 18aae8e..1a7918c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
similarity index 96%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
index b815d76..2fe37c3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.api.http.servlet;
+package org.apache.asterix.api.http.server;
 
 public class ServletConstants {
     public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index fdd106d..06e2383 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
index 5acba381..eeef8e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index a16f678..3b4b974 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -34,14 +34,11 @@
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
-import org.apache.asterix.active.IRetryPolicy;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -53,8 +50,6 @@
 import org.apache.asterix.metadata.api.IActiveEntityController;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -94,8 +89,7 @@
     protected boolean isFetchingStats;
     protected int numRegistered;
     protected int numDeRegistered;
-    protected volatile Future<Void> recoveryTask;
-    protected volatile boolean cancelRecovery;
+    protected volatile RecoveryTask rt;
     protected volatile boolean suspended = false;
     // failures
     protected Exception jobFailure;
@@ -199,7 +193,8 @@
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
             setState(ActivityState.TEMPORARILY_FAILED);
-            if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING) {
+            if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING
+                    && prevState != ActivityState.RESUMING) {
                 recover();
             }
         } else {
@@ -356,114 +351,18 @@
     @Override
     public synchronized void recover() throws HyracksDataException {
         LOGGER.log(level, "Recover is called on " + entityId);
-        if (recoveryTask != null) {
-            LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception");
-            throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
-        }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
             LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
             setState(ActivityState.PERMANENTLY_FAILED);
         } else {
             ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
-            IRetryPolicy policy = retryPolicyFactory.create(this);
-            cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
             LOGGER.log(level, "Recovery task has been submitted");
-            recoveryTask = executor.submit(() -> {
-                String nameBefore = Thread.currentThread().getName();
-                try {
-                    Thread.currentThread().setName("RecoveryTask (" + entityId + ")");
-                    doRecover(policy);
-                } finally {
-                    Thread.currentThread().setName(nameBefore);
-                }
-                return null;
-            });
+            rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+            executor.submit(rt.recover());
         }
     }
 
-    protected Void doRecover(IRetryPolicy policy)
-            throws AlgebricksException, HyracksDataException, InterruptedException {
-        LOGGER.log(level, "Actual Recovery task has started");
-        if (getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(level, "but its state is not temp failure and so we're just returning");
-            return null;
-        }
-        LOGGER.log(level, "calling the policy");
-        while (policy.retry()) {
-            synchronized (this) {
-                if (cancelRecovery) {
-                    recoveryTask = null;
-                    notifyAll();
-                    return null;
-                }
-                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
-                    if (cancelRecovery) {
-                        recoveryTask = null;
-                        notifyAll();
-                        return null;
-                    }
-                    wait();
-                }
-            }
-            waitForNonTransitionState();
-            IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
-            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    entityId.getDataverse() + '.' + entityId.getEntityName());
-            for (Dataset dataset : getDatasets()) {
-                MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
-                        dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (this) {
-                try {
-                    if (cancelRecovery) {
-                        recoveryTask = null;
-                        notifyAll();
-                        return null;
-                    }
-                    setState(ActivityState.RECOVERING);
-                    doStart(metadataProvider);
-                    recoveryTask = null;
-                    notifyAll();
-                    return null;
-                } catch (Exception e) {
-                    LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
-                    setState(ActivityState.TEMPORARILY_FAILED);
-                    recoverFailure = e;
-                } finally {
-                    metadataProvider.getLocks().reset();
-                }
-                notifyAll();
-            }
-        }
-        // Recovery task is essntially over now either through failure or through cancellation(stop)
-        synchronized (this) {
-            recoveryTask = null;
-            notifyAll();
-            if (state != ActivityState.TEMPORARILY_FAILED) {
-                return null;
-            }
-        }
-        IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
-        try {
-            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    entityId.getDataverse() + '.' + entityId.getEntityName());
-            for (Dataset dataset : getDatasets()) {
-                MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
-                        DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (this) {
-                if (state == ActivityState.TEMPORARILY_FAILED) {
-                    setState(ActivityState.PERMANENTLY_FAILED);
-                }
-                notifyAll();
-            }
-        } finally {
-            metadataProvider.getLocks().reset();
-        }
-        return null;
-    }
-
     @Override
     public synchronized void start(MetadataProvider metadataProvider)
             throws HyracksDataException, InterruptedException {
@@ -503,13 +402,10 @@
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
         }
         if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
-            if (recoveryTask != null) {
+            if (rt != null) {
                 setState(ActivityState.STOPPING);
-                cancelRecovery = true;
-                recoveryTask.cancel(true);
-                while (recoveryTask != null) {
-                    wait();
-                }
+                rt.cancel();
+                rt = null;
             }
             setState(ActivityState.STOPPED);
             try {
@@ -533,6 +429,10 @@
         }
     }
 
+    public RecoveryTask getRecoveryTask() {
+        return rt;
+    }
+
     @Override
     public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
         WaitForStateSubscriber subscriber;
@@ -602,8 +502,9 @@
             setState(ActivityState.RESUMING);
             WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
                     EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED));
-            recoveryTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
-                    .getExecutor().submit(() -> resumeOrRecover(metadataProvider));
+            rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+            metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor()
+                    .submit(() -> rt.resumeOrRecover(metadataProvider));
             try {
                 subscriber.sync();
             } catch (Exception e) {
@@ -616,28 +517,6 @@
         }
     }
 
-    protected Void resumeOrRecover(MetadataProvider metadataProvider)
-            throws HyracksDataException, AlgebricksException, InterruptedException {
-        try {
-            doResume(metadataProvider);
-            synchronized (this) {
-                setState(ActivityState.RUNNING);
-                recoveryTask = null;
-            }
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + " Failed", e);
-            setState(ActivityState.TEMPORARILY_FAILED);
-            if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-                setState(ActivityState.PERMANENTLY_FAILED);
-            } else {
-                IRetryPolicy policy = retryPolicyFactory.create(this);
-                cancelRecovery = false;
-                doRecover(policy);
-            }
-        }
-        return null;
-    }
-
     @Override
     public boolean isActive() {
         return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED;
@@ -652,15 +531,6 @@
         this.locations = locations;
     }
 
-    public Future<Void> getRecoveryTask() {
-        return recoveryTask;
-    }
-
-    public synchronized void cancelRecovery() {
-        cancelRecovery = true;
-        notifyAll();
-    }
-
     @Override
     public Exception getJobFailure() {
         return jobFailure;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
new file mode 100644
index 0000000..7b7de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IRetryPolicy;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecoveryTask {
+
+    private static final Logger LOGGER = Logger.getLogger(RecoveryTask.class.getName());
+    private static final Level level = Level.INFO;
+    private final ActiveEntityEventsListener listener;
+    private volatile boolean cancelRecovery = false;
+    private final IRetryPolicyFactory retryPolicyFactory;
+    private final MetadataProvider metadataProvider;
+    private final IClusterStateManager clusterStateManager;
+    private Exception failure;
+
+    public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
+            IRetryPolicyFactory retryPolicyFactory) {
+        this.listener = listener;
+        this.retryPolicyFactory = retryPolicyFactory;
+        this.metadataProvider = new MetadataProvider(appCtx, null);
+        this.clusterStateManager = appCtx.getClusterStateManager();
+    }
+
+    public Callable<Void> recover() {
+        if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+            return () -> {
+                return null;
+            };
+        }
+        IRetryPolicy policy = retryPolicyFactory.create(listener);
+        return () -> {
+            String nameBefore = Thread.currentThread().getName();
+            try {
+                Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")");
+                doRecover(policy);
+            } finally {
+                Thread.currentThread().setName(nameBefore);
+            }
+            return null;
+        };
+    }
+
+    public void cancel() {
+        cancelRecovery = true;
+    }
+
+    protected Void resumeOrRecover(MetadataProvider metadataProvider)
+            throws HyracksDataException, AlgebricksException, InterruptedException {
+        try {
+            synchronized (listener) {
+                listener.doResume(metadataProvider);
+                listener.setState(ActivityState.RUNNING);
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "First attempt to resume " + listener.getEntityId() + " Failed", e);
+            synchronized (listener) {
+                if (listener.getState() == ActivityState.RESUMING) {
+                    // This will be the case if compilation failure
+                    // If the failure is a runtime failure, then the state
+                    // would've been set to temporarily failed already
+                    listener.setState(ActivityState.TEMPORARILY_FAILED);
+                }
+            }
+            if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+                synchronized (listener) {
+                    if (!cancelRecovery) {
+                        listener.setState(ActivityState.PERMANENTLY_FAILED);
+                    }
+                }
+            } else {
+                IRetryPolicy policy = retryPolicyFactory.create(listener);
+                doRecover(policy);
+            }
+        }
+        return null;
+    }
+
+    protected Void doRecover(IRetryPolicy policy)
+            throws AlgebricksException, HyracksDataException, InterruptedException {
+        LOGGER.log(level, "Actual Recovery task has started");
+        if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+            LOGGER.log(level, "but its state is not temp failure and so we're just returning");
+            return null;
+        }
+        LOGGER.log(level, "calling the policy");
+        while (policy.retry()) {
+            synchronized (listener) {
+                if (cancelRecovery) {
+                    return null;
+                }
+                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
+                    if (cancelRecovery) {
+                        return null;
+                    }
+                    wait();
+                }
+            }
+            IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+            for (Dataset dataset : listener.getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
+                        dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (listener) {
+                try {
+                    if (cancelRecovery) {
+                        return null;
+                    }
+                    listener.setState(ActivityState.RECOVERING);
+                    listener.doStart(metadataProvider);
+                    return null;
+                } catch (Exception e) {
+                    LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+                    listener.setState(ActivityState.TEMPORARILY_FAILED);
+                    failure = e;
+                } finally {
+                    metadataProvider.getLocks().reset();
+                }
+                listener.notifyAll();
+            }
+        }
+        // Recovery task is essntially over now either through failure or through cancellation(stop)
+        synchronized (listener) {
+            listener.notifyAll();
+            if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+                return null;
+            }
+        }
+        IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+        try {
+            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+            for (Dataset dataset : listener.getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (listener) {
+                if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                    listener.setState(ActivityState.PERMANENTLY_FAILED);
+                }
+                listener.notifyAll();
+            }
+        } finally {
+            metadataProvider.getLocks().reset();
+        }
+        return null;
+    }
+
+    public Exception getFailure() {
+        return failure;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
new file mode 100644
index 0000000..fb6ec37
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class CancelQueryRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = Logger.getLogger(CancelQueryRequest.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+    private final String contextId;
+
+    public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.contextId = contextId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        CCApplication application = (CCApplication) ccs.getApplication();
+        IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
+        JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+
+        if (jobId == null) {
+            LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+        } else {
+            try {
+                IHyracksClientConnection hcc = application.getHcc();
+                hcc.cancelJob(jobId);
+                executorsCtx.removeJobIdFromClientContextId(contextId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
+            }
+        }
+        CancelQueryResponse response = new CancelQueryResponse(reqId);
+        CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
new file mode 100644
index 0000000..4fbcf22
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CancelQueryResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+
+    public CancelQueryResponse(long reqId) {
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..5cee3d9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,6 +22,7 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -54,23 +55,18 @@
 
 public final class ExecuteStatementRequestMessage implements ICcAddressedMessage {
     private static final long serialVersionUID = 1L;
-
     private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
-
+    //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062
+    public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
+    //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
+    public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
     private final String requestNodeId;
-
     private final long requestMessageId;
-
     private final ILangExtension.Language lang;
-
     private final String statementsText;
-
     private final SessionConfig sessionConfig;
-
     private final IStatementExecutor.ResultDelivery delivery;
-
     private final String clientContextID;
-
     private final String handleUrl;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
@@ -102,47 +98,41 @@
         IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
         IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
         IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
-
-        ccSrv.getExecutor().submit(() -> {
-            ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
-            try {
-                IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
-                List<Statement> statements = parser.parse();
-                StringWriter outWriter = new StringWriter(256);
-                PrintWriter outPrinter = new PrintWriter(outWriter);
-                SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
-                SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
-                SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
-                SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
-                SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
-                        appendHandle, appendStatus);
-
-                IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
-
-                MetadataManager.INSTANCE.init();
-                IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
-                        compilationProvider, storageComponentProvider);
-                translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata,
-                        new IStatementExecutor.Stats(), clientContextID, statementExecutorContext);
-
-                outPrinter.close();
-                responseMsg.setResult(outWriter.toString());
-                responseMsg.setMetadata(outMetadata);
-            } catch (AlgebricksException | HyracksException | TokenMgrError
-                    | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
-                // we trust that "our" exceptions are serializable and have a comprehensible error message
-                GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
-                responseMsg.setError(pe);
-            } catch (Exception e) {
-                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
-                responseMsg.setError(new Exception(e.toString()));
-            }
-            try {
-                messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, e.toString(), e);
-            }
-        });
+        ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+        try {
+            IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+            List<Statement> statements = parser.parse();
+            StringWriter outWriter = new StringWriter(256);
+            PrintWriter outPrinter = new PrintWriter(outWriter);
+            SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+            SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+            SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+            SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
+                    appendHandle, appendStatus);
+            IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
+            MetadataManager.INSTANCE.init();
+            IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                    compilationProvider, storageComponentProvider);
+            translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, new IStatementExecutor.Stats(),
+                    clientContextID, statementExecutorContext);
+            outPrinter.close();
+            responseMsg.setResult(outWriter.toString());
+            responseMsg.setMetadata(outMetadata);
+        } catch (AlgebricksException | HyracksException | TokenMgrError
+                | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+            // we trust that "our" exceptions are serializable and have a comprehensible error message
+            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
+            responseMsg.setError(pe);
+        } catch (Exception e) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
+            responseMsg.setError(new Exception(e.toString()));
+        }
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
     }
 
     private String getRejectionReason(ClusterControllerService ccSrv) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 4f9aa0c..54f0a4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.app.message;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -31,8 +29,6 @@
 public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
     private static final long serialVersionUID = 1L;
 
-    public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
-
     private final long requestMessageId;
 
     private String result;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..9040ad1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -21,8 +21,8 @@
 
 import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
 import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.util.Arrays;
 import java.util.List;
@@ -47,10 +47,10 @@
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
 import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
 import org.apache.asterix.api.http.server.RebalanceApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.api.http.server.ShutdownApiServlet;
 import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.ResourceIdManager;
@@ -106,6 +106,7 @@
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
+    private IHyracksClientConnection hcc;
 
     @Override
     public void start(IServiceContext serviceCtx, String[] args) throws Exception {
@@ -124,6 +125,9 @@
 
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
+        String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+        int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+        hcc = new HyracksConnection(strIP, port);
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ResourceIdManager resourceIdManager = new ResourceIdManager();
         IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -196,7 +200,6 @@
     protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception {
         HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
                 externalProperties.getWebInterfacePort());
-        IHyracksClientConnection hcc = getHcc();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx,
                 ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP),
@@ -207,7 +210,6 @@
     protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception {
         HttpServer jsonAPIServer =
                 new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort());
-        IHyracksClientConnection hcc = getHcc();
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
@@ -252,7 +254,6 @@
     protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception {
         HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
                 externalProperties.getQueryWebInterfacePort());
-        IHyracksClientConnection hcc = getHcc();
         queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         queryWebServer.addServlet(new QueryWebInterfaceServlet(appCtx, queryWebServer.ctx(), new String[] { "/*" }));
         return queryWebServer;
@@ -357,9 +358,7 @@
         return appCtx;
     }
 
-    protected IHyracksClientConnection getHcc() throws Exception {
-        String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
-        int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
-        return new HyracksConnection(strIP, port);
+    public IHyracksClientConnection getHcc() {
+        return hcc;
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 5abbe40..cd58d8f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 5f40a85..3cb46fe 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -29,6 +29,7 @@
 
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
deleted file mode 100644
index e0539ac..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.api.http.servlet;
-
-import org.apache.asterix.api.http.server.QueryServiceServlet;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class QueryServiceServletTest {
-
-    @Test
-    public void testTimeUnitFormatNanos() throws Exception {
-        Assert.assertEquals("123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(123456789012l));
-        Assert.assertEquals("12.345678901s", QueryServiceServlet.TimeUnit.formatNanos(12345678901l));
-        Assert.assertEquals("1.234567890s", QueryServiceServlet.TimeUnit.formatNanos(1234567890l));
-        Assert.assertEquals("123.456789ms", QueryServiceServlet.TimeUnit.formatNanos(123456789l));
-        Assert.assertEquals("12.345678ms", QueryServiceServlet.TimeUnit.formatNanos(12345678l));
-        Assert.assertEquals("1.234567ms", QueryServiceServlet.TimeUnit.formatNanos(1234567l));
-        Assert.assertEquals("123.456µs", QueryServiceServlet.TimeUnit.formatNanos(123456l));
-        Assert.assertEquals("12.345µs", QueryServiceServlet.TimeUnit.formatNanos(12345l));
-        Assert.assertEquals("1.234µs", QueryServiceServlet.TimeUnit.formatNanos(1234l));
-        Assert.assertEquals("123ns", QueryServiceServlet.TimeUnit.formatNanos(123l));
-        Assert.assertEquals("12ns", QueryServiceServlet.TimeUnit.formatNanos(12l));
-        Assert.assertEquals("1ns", QueryServiceServlet.TimeUnit.formatNanos(1l));
-        Assert.assertEquals("-123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(-123456789012l));
-        Assert.assertEquals("120.000000000s", QueryServiceServlet.TimeUnit.formatNanos(120000000000l));
-        Assert.assertEquals("-12ns", QueryServiceServlet.TimeUnit.formatNanos(-12l));
-    }
-}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
index f994e98..e583c75 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
@@ -19,8 +19,8 @@
 
 package org.apache.asterix.api.http.servlet;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
new file mode 100644
index 0000000..12c61d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import org.apache.asterix.api.http.server.Duration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParseDurationTest {
+
+    @Test
+    public void test() throws Exception {
+        // simple
+        Assert.assertEquals(0, Duration.parseDurationStringToNanos("0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(30),
+                Duration.parseDurationStringToNanos("30s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1478),
+                Duration.parseDurationStringToNanos("1478s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(-5),
+                Duration.parseDurationStringToNanos("-5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("+5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+                Duration.parseDurationStringToNanos("-0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+                Duration.parseDurationStringToNanos("+0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5.0s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(5)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(600),
+                Duration.parseDurationStringToNanos("5.6s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5.s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500),
+                Duration.parseDurationStringToNanos(".5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("1.0s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("1.00s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+                Duration.parseDurationStringToNanos("1.004s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+                Duration.parseDurationStringToNanos("1.0040s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(100)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("100.00100s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(10),
+                Duration.parseDurationStringToNanos("10ns"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(11),
+                Duration.parseDurationStringToNanos("11us"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+                Duration.parseDurationStringToNanos("12µs"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+                Duration.parseDurationStringToNanos("12μs"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(13),
+                Duration.parseDurationStringToNanos("13ms"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(14),
+                Duration.parseDurationStringToNanos("14s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MINUTES.toNanos(15),
+                Duration.parseDurationStringToNanos("15m"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.HOURS.toNanos(16),
+                Duration.parseDurationStringToNanos("16h"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(3) + java.util.concurrent.TimeUnit.MINUTES.toNanos(30),
+                Duration.parseDurationStringToNanos("3h30m"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(10)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500)
+                        + java.util.concurrent.TimeUnit.MINUTES.toNanos(4),
+                Duration.parseDurationStringToNanos("10.5s4m"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.MINUTES.toNanos(-2) + java.util.concurrent.TimeUnit.SECONDS.toNanos(-3)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(-400),
+                Duration.parseDurationStringToNanos("-2m3.4s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(1) + java.util.concurrent.TimeUnit.MINUTES.toNanos(2)
+                        + java.util.concurrent.TimeUnit.SECONDS.toNanos(3)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4)
+                        + java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(5)
+                        + java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(6),
+                Duration.parseDurationStringToNanos("1h2m3s4ms5us6ns"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(39) + java.util.concurrent.TimeUnit.MINUTES.toNanos(9)
+                        + java.util.concurrent.TimeUnit.SECONDS.toNanos(14)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(425),
+                Duration.parseDurationStringToNanos("39h9m14.425s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(52763797000L),
+                Duration.parseDurationStringToNanos("52763797000ns"));
+        Assert.assertEquals(1199999998800L, Duration.parseDurationStringToNanos("0.3333333333333333333333h"));
+        Assert.assertEquals(9007199254740993L, Duration.parseDurationStringToNanos("9007199254740993ns"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775807ns"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775.807us"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036s854ms775us807ns"));
+        Assert.assertEquals(-9223372036854775807L, Duration.parseDurationStringToNanos("-9223372036854775807ns"));
+        assertFail("");
+        assertFail("3");
+        assertFail("-");
+        assertFail("s");
+        assertFail(".");
+        assertFail("-.");
+        assertFail(".s");
+        assertFail("+.s");
+        assertFail("3000000h");
+        assertFail("9223372036854775808ns");
+        assertFail("9223372036854775.808us");
+        assertFail("9223372036854ms775us808ns");
+        assertFail("-9223372036854775808ns");
+    }
+
+    @Test
+    public void testDurationFormatNanos() throws Exception {
+        Assert.assertEquals("123.456789012s", Duration.formatNanos(123456789012l));
+        Assert.assertEquals("12.345678901s", Duration.formatNanos(12345678901l));
+        Assert.assertEquals("1.234567890s", Duration.formatNanos(1234567890l));
+        Assert.assertEquals("123.456789ms", Duration.formatNanos(123456789l));
+        Assert.assertEquals("12.345678ms", Duration.formatNanos(12345678l));
+        Assert.assertEquals("1.234567ms", Duration.formatNanos(1234567l));
+        Assert.assertEquals("123.456µs", Duration.formatNanos(123456l));
+        Assert.assertEquals("12.345µs", Duration.formatNanos(12345l));
+        Assert.assertEquals("1.234µs", Duration.formatNanos(1234l));
+        Assert.assertEquals("123ns", Duration.formatNanos(123l));
+        Assert.assertEquals("12ns", Duration.formatNanos(12l));
+        Assert.assertEquals("1ns", Duration.formatNanos(1l));
+        Assert.assertEquals("-123.456789012s", Duration.formatNanos(-123456789012l));
+        Assert.assertEquals("120.000000000s", Duration.formatNanos(120000000000l));
+        Assert.assertEquals("-12ns", Duration.formatNanos(-12l));
+    }
+
+    private void assertFail(String duration) {
+        try {
+            Duration.parseDurationStringToNanos(duration);
+            Assert.fail("Expected parseDuration(" + duration + ") to fail but it didn't");
+        } catch (HyracksDataException hde) {
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 143f7d1..fcfb428 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -68,6 +68,9 @@
     public static final int POLYGON_3_POINTS = 25;
     public static final int POLYGON_INVALID = 26;
     public static final int OPERATION_NOT_SUPPORTED = 27;
+    public static final int INVALID_DURATION = 28;
+    public static final int UNKNOWN_DURATION_UNIT = 29;
+    public static final int QUERY_TIMEOUT = 30;
 
     public static final int INSTANTIATION_ERROR = 100;
 
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 715c27d..5bd5482 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -61,6 +61,9 @@
 25 = Polygon must have at least 3 points
 26 = %1$s can not be an instance of polygon
 27 = Operation not supported
+28 = Invalid duration %1$s
+29 = Unknown duration unit %1$s
+30 = Query timed out
 
 100 = Unable to instantiate class %1$s
 
diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml
index 5497cf4..458bb67 100644
--- a/asterixdb/asterix-metadata/pom.xml
+++ b/asterixdb/asterix-metadata/pom.xml
@@ -16,7 +16,8 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>apache-asterixdb</artifactId>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e2868ae..95479c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -130,6 +130,9 @@
 
         public CancelJobFunction(JobId jobId) {
             this.jobId = jobId;
+            if (jobId == null) {
+                throw new IllegalArgumentException("jobId");
+            }
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index dbbaf9f..a3078b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -65,6 +65,8 @@
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobExecutor {
     private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
@@ -114,9 +116,10 @@
         ccs.getContext().notifyJobStart(jobRun.getJobId());
     }
 
-    public void cancelJob() throws HyracksException {
+    public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
         // If the job is already terminated or failed, do nothing here.
         if (jobRun.getPendingStatus() != null) {
+            callback.setValue(null);
             return;
         }
         // Sets the cancelled flag.
@@ -124,7 +127,8 @@
         // Aborts on-ongoing task clusters.
         abortOngoingTaskClusters(ta -> false, ta -> null);
         // Aborts the whole job.
-        abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())));
+        abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())),
+                callback);
     }
 
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -196,8 +200,8 @@
                     "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
-            ccs.getWorkQueue()
-                    .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null));
+            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED,
+                    null, NoOpCallback.INSTANCE));
             return;
         }
         startRunnableTaskClusters(taskClusterRoots);
@@ -520,14 +524,14 @@
         }
     }
 
-    public void abortJob(List<Exception> exceptions) {
+    public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback) {
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
             abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
         assert inProgressTaskClusters.isEmpty();
-        ccs.getWorkQueue()
-                .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions));
+        ccs.getWorkQueue().schedule(
+                new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions, callback));
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
@@ -686,7 +690,7 @@
                         + " as failed and the number of max re-attempts = " + maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
                     LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId());
-                    abortJob(exceptions);
+                    abortJob(exceptions, NoOpCallback.INSTANCE);
                     return;
                 }
                 LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId());
@@ -696,7 +700,7 @@
                         "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
             }
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 
@@ -720,7 +724,7 @@
                     ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
             startRunnableActivityClusters();
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8fe542f..a9ddee3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 /**
  * This interface abstracts the job lifecycle management and job scheduling for a cluster.
@@ -47,10 +48,12 @@
     /**
      * Cancel a job with a given job id.
      *
+     * @param callback
+     *
      * @param jobId,
      *            the id of the job.
      */
-    void cancel(JobId jobId) throws HyracksException;
+    void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException;
 
     /**
      * This method is called when the master process decides to complete job.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index abf1d57..4ba847d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,6 +46,8 @@
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -115,17 +117,14 @@
     }
 
     @Override
-    public void cancel(JobId jobId) throws HyracksException {
-        if (jobId == null) {
-            return;
-        }
+    public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException {
         // Cancels a running job.
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then consequently
             // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
-            jobRun.getExecutor().cancelJob();
+            jobRun.getExecutor().cancelJob(callback);
             return;
         }
         // Removes a pending job.
@@ -138,6 +137,7 @@
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
         }
+        callback.setValue(null);
     }
 
     @Override
@@ -322,7 +322,7 @@
             // fail the job then abort it
             run.setStatus(JobStatus.FAILURE, exceptions);
             // abort job will trigger JobCleanupWork
-            run.getExecutor().abortJob(exceptions);
+            run.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index f3b67c9..e3135df 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -42,10 +42,7 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            if (jobId != null) {
-                jobManager.cancel(jobId);
-            }
-            callback.setValue(null);
+            jobManager.cancel(jobId, callback);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 502ac50..bb85c13 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobCleanupWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName());
@@ -37,12 +38,15 @@
     private JobId jobId;
     private JobStatus status;
     private List<Exception> exceptions;
+    private IResultCallback<Void> callback;
 
-    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) {
+    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions,
+            IResultCallback<Void> callback) {
         this.jobManager = jobManager;
         this.jobId = jobId;
         this.status = status;
         this.exceptions = exceptions;
+        this.callback = callback;
     }
 
     @Override
@@ -53,6 +57,7 @@
         try {
             JobRun jobRun = jobManager.get(jobId);
             jobManager.prepareComplete(jobRun, status, exceptions);
+            callback.setValue(null);
         } catch (HyracksException e) {
             // Fail the job with the caught exception during final completion.
             JobRun run = jobManager.get(jobId);
@@ -62,6 +67,7 @@
             }
             completionException.add(0, e);
             run.setStatus(JobStatus.FAILURE, completionException);
+            callback.setException(e);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index ed2a740..0d46d64 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
+import org.apache.hyracks.control.common.work.NoOpCallback;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -207,7 +208,7 @@
     }
 
     @Test
-    public void testCancel() throws HyracksException {
+    public void testCancel() throws Exception {
         CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -247,12 +248,12 @@
 
         // Cancels deferred jobs.
         for (JobRun run : deferredRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
         }
 
         // Cancels runnable jobs.
         for (JobRun run : acceptedRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
index 150e0e8..ca0c7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.common.work;
 
 public class FutureValue<T> implements IResultCallback<T> {
+
     private boolean done;
 
     private T value;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
new file mode 100644
index 0000000..041cee0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.work;
+
+public class NoOpCallback implements IResultCallback<Void> {
+
+    public static final NoOpCallback INSTANCE = new NoOpCallback();
+
+    private NoOpCallback() {
+    }
+
+    @Override
+    public void setValue(Void result) {
+        // Dummy is used when no callback is provided
+    }
+
+    @Override
+    public void setException(Exception e) {
+        // Dummy is used when no callback is provided
+    }
+
+}