Add a REST endpoint for query cancellation.

This change also includes the following parts:
- Fix failure handling and potentially thread leakage in
  MaterializingPipelinedPartition;
- Fix failure handling in PartitionDataWriter;
- Add a new test suite: SqlppExecutionWithCancellationTest.

Change-Id: I2936ac83f71bbef533e2695ed0a2b220c23fc483
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1564
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 2066f73..92c487b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -77,21 +77,7 @@
     }
 
     /**
-     * Compiles and execute a list of statements.
-     *
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the results.
-     * @param resultDelivery
-     *            The {@code ResultDelivery} kind required for queries in the list of statements
-     * @throws Exception
-     */
-    void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
-            throws Exception;
-
-    /**
-     * Compiles and execute a list of statements.
+     * Compiles and execute a list of statements, without passing in client context id and context.
      *
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
@@ -107,6 +93,26 @@
             Stats stats) throws Exception;
 
     /**
+     * Compiles and execute a list of statements, with passing in client context id and context.
+     *
+     * @param hcc
+     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
+     * @param hdc
+     *            A Hyracks dataset client object that is used to read the results.
+     * @param resultDelivery
+     *            The {@code ResultDelivery} kind required for queries in the list of statements
+     * @param stats
+     *            a reference to write the stats of executed queries
+     * @param clientContextId
+     *            the client context id for the query
+     * @param ctx
+     *            the context that contains the meta information for all queries
+     * @throws Exception
+     */
+    void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception;
+
+    /**
      * rewrites and compiles query into a hyracks job specifications
      *
      * @param clusterInfoCollector
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
new file mode 100644
index 0000000..81e1ebf
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * The context for statement executors, which maintains the meta information of all queries.
+ * TODO(yingyi): also maintain the mapping from server generated request ids to jobs.
+ */
+public interface IStatementExecutorContext {
+
+    /**
+     * Gets the Hyracks JobId from the user-provided client context id.
+     *
+     * @param clientContextId,
+     *            a user provided client context id.
+     * @return the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
+     */
+    JobId getJobIdFromClientContextId(String clientContextId);
+
+    /**
+     * Puts a client context id for a statement and the corresponding Hyracks job id.
+     *
+     * @param clientContextId,
+     *            a user provided client context id.
+     * @param jobId,
+     *            the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
+     */
+    void put(String clientContextId, JobId jobId);
+
+    /**
+     * Removes the information about the query corresponding to a user-provided client context id.
+     *
+     * @param clientContextId,
+     *            a user provided client context id.
+     */
+    JobId removeJobIdFromClientContextId(String clientContextId);
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
new file mode 100644
index 0000000..7c06762
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.ctx;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.job.JobId;
+
+public class StatementExecutorContext implements IStatementExecutorContext {
+
+    private final Map<String, JobId> runningQueries = new ConcurrentHashMap<>();
+
+    @Override
+    public JobId getJobIdFromClientContextId(String clientContextId) {
+        return runningQueries.get(clientContextId);
+    }
+
+    @Override
+    public void put(String clientContextId, JobId jobId) {
+        runningQueries.put(clientContextId, jobId);
+    }
+
+    @Override
+    public JobId removeJobIdFromClientContextId(String clientContextId) {
+        return runningQueries.remove(clientContextId);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7a33c0e..d91d5fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -145,7 +145,8 @@
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
-            translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE);
+            translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
+                    new IStatementExecutor.Stats());
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index de227eb..788927f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -83,7 +83,7 @@
     private ObjectNode getClusterDiagnosticsJSON() throws Exception {
         ObjectMapper om = new ObjectMapper();
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
-        ExecutorService executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE);
+        ExecutorService executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
         Map<String, Future<ObjectNode>> ccFutureData = new HashMap<>();
         ccFutureData.put("threaddump",
                 executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(null)))));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
new file mode 100644
index 0000000..4405d29
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * The servlet provides a REST API for cancelling an on-going query.
+ */
+public class QueryCancellationServlet extends AbstractServlet {
+    private static final Logger LOGGER = Logger.getLogger(QueryCancellationServlet.class.getName());
+    private static final String CLIENT_CONTEXT_ID = "client_context_id";
+
+    public QueryCancellationServlet(ConcurrentMap<String, Object> ctx, String... paths) {
+        super(ctx, paths);
+    }
+
+    @Override
+    protected void delete(IServletRequest request, IServletResponse response) throws IOException {
+        // gets the parameter client_context_id from the request.
+        String clientContextId = request.getParameter(CLIENT_CONTEXT_ID);
+        if (clientContextId == null) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+
+        // Retrieves the corresponding Hyracks job id.
+        IStatementExecutorContext runningQueries = (IStatementExecutorContext) ctx
+                .get(ServletConstants.RUNNING_QUERIES_ATTR);
+        IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(ServletConstants.HYRACKS_CONNECTION_ATTR);
+        JobId jobId = runningQueries.getJobIdFromClientContextId(clientContextId);
+
+        if (jobId == null) {
+            // response: NOT FOUND
+            response.setStatus(HttpResponseStatus.NOT_FOUND);
+            return;
+        }
+        try {
+            // Cancels the on-going job.
+            hcc.cancelJob(jobId);
+            // Removes the cancelled query from the map activeQueries.
+            runningQueries.removeJobIdFromClientContextId(clientContextId);
+            // response: OK
+            response.setStatus(HttpResponseStatus.OK);
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, e.getMessage(), e);
+            }
+            // response: INTERNAL SERVER ERROR
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 9d22452..42bb4f9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -29,6 +29,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.app.result.ResultUtil;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -43,6 +45,7 @@
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -65,6 +68,7 @@
     private final ILangCompilationProvider compilationProvider;
     private final IStatementExecutorFactory statementExecutorFactory;
     private final IStorageComponentProvider componentProvider;
+    private final IStatementExecutorContext queryCtx = new StatementExecutorContext();
 
     public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths,
             ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
@@ -73,6 +77,7 @@
         this.compilationProvider = compilationProvider;
         this.statementExecutorFactory = statementExecutorFactory;
         this.componentProvider = componentProvider;
+        ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
     }
 
     @Override
@@ -425,7 +430,8 @@
             IStatementExecutor translator =
                     statementExecutorFactory.create(statements, sessionConfig, compilationProvider, componentProvider);
             execStart = System.nanoTime();
-            translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats);
+            translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats,
+                    param.clientContextID, queryCtx);
             execEnd = System.nanoTime();
             printStatus(resultWriter, ResultDelivery.ASYNC == delivery ? ResultStatus.RUNNING : ResultStatus.SUCCESS);
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
@@ -459,4 +465,4 @@
             LOGGER.warning("Error flushing output writer");
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index bdc9d62..d7edb23 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -203,7 +203,7 @@
             MetadataManager.INSTANCE.init();
             IStatementExecutor translator = statementExecutorFactory.create(aqlStatements, sessionConfig,
                     compilationProvider, componentProvider);
-            translator.compileAndExecute(hcc, hds, resultDelivery);
+            translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats());
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index 5b96cab..a9d4e22 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -22,7 +22,8 @@
     public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
     public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
     public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO";
-    public static final String EXECUTOR_SERVICE = "org.apache.asterix.EXECUTOR_SERVICE";
+    public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE_ATTR";
+    public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
 
     private ServletConstants() {
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index c09f8cb..d03e574 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -104,7 +104,8 @@
 
         IStatementExecutor translator =
                 statementExecutorFactory.create(statements, conf, compilationProvider, storageComponentProvider);
-        translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE);
+        translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE,
+                new IStatementExecutor.Stats());
         writer.flush();
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1e4d866..26a6ebd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -156,6 +156,7 @@
 import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
@@ -232,27 +233,15 @@
         return functionDecls;
     }
 
-    /**
-     * Compiles and submits for execution a list of AQL statements.
-     *
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the results.
-     * @param resultDelivery
-     *            True if the results should be read asynchronously or false if we should wait for results to be read.
-     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
-     * @throws Exception
-     */
     @Override
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
-            throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, new Stats());
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            Stats stats) throws Exception {
+        compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
     }
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception {
+            Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -329,7 +318,8 @@
                             metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
                                     || resultDelivery == ResultDelivery.DEFERRED);
                         }
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false,
+                                clientContextId, ctx);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, false);
@@ -362,7 +352,8 @@
                         metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
-                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats, clientContextId,
+                                ctx);
                         break;
                     case Statement.Kind.COMPACT:
                         handleCompactStatement(metadataProvider, stmt, hcc);
@@ -1809,8 +1800,8 @@
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
-
+            IStatementExecutor.Stats stats, boolean compileOnly, String clientContextId, IStatementExecutorContext ctx)
+            throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         Query query = stmtInsertUpsert.getQuery();
@@ -1852,7 +1843,7 @@
         }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
-            deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+            deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
         } else {
             locker.lock();
             try {
@@ -2371,7 +2362,8 @@
     }
 
     protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
+            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, String clientContextId,
+            IStatementExecutorContext ctx) throws Exception {
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
@@ -2402,11 +2394,12 @@
                 throw e;
             }
         };
-        deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+        deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
-            MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats)
+            MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats,
+            String clientContextId, IStatementExecutorContext ctx)
             throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
@@ -2422,7 +2415,7 @@
                                 printed.setTrue();
                                 printed.notify();
                             }
-                        });
+                        }, clientContextId, ctx);
                     } catch (Exception e) {
                         GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
                                 resultDelivery.name() + " job " + "with id " + jobId + " failed", e);
@@ -2439,12 +2432,12 @@
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
                     ResultUtil.printResults(resultReader, sessionConfig, stats,
                             metadataProvider.findOutputRecordType());
-                });
+                }, clientContextId, ctx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
                     ResultUtil.printResultHandle(new ResultHandle(id, resultSetId), sessionConfig);
-                });
+                }, clientContextId, ctx);
                 break;
             default:
                 break;
@@ -2452,7 +2445,8 @@
     }
 
     private static JobId createAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler,
-            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer) throws Exception {
+            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
+            IStatementExecutorContext ctx) throws Exception {
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
@@ -2460,6 +2454,10 @@
                 return JobId.INVALID;
             }
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+
+            if (ctx != null && clientContextId != null) {
+                ctx.put(clientContextId, jobId); // Adds the running job into the context.
+            }
             if (ResultDelivery.ASYNC == resultDelivery) {
                 printer.print(jobId);
                 hcc.waitForCompletion(jobId);
@@ -2469,6 +2467,10 @@
             }
             return jobId;
         } finally {
+            // No matter the job succeeds or fails, removes it into the context.
+            if (ctx != null && clientContextId != null) {
+                ctx.removeJobIdFromClientContextId(clientContextId);
+            }
             locker.unlock();
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index c831508..fcc6f1f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.asterix.hyracks.bootstrap;
 
 import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
@@ -38,6 +39,7 @@
 import org.apache.asterix.api.http.server.FullApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.QueryApiServlet;
+import org.apache.asterix.api.http.server.QueryCancellationServlet;
 import org.apache.asterix.api.http.server.QueryResultApiServlet;
 import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -184,7 +186,7 @@
         IHyracksClientConnection hcc = getHcc();
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, AppContextInfo.INSTANCE);
-        jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE,
+        jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
                 ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
 
         // AQL rest APIs.
@@ -203,6 +205,7 @@
         addServlet(jsonAPIServer, Servlets.QUERY_STATUS);
         addServlet(jsonAPIServer, Servlets.QUERY_RESULT);
         addServlet(jsonAPIServer, Servlets.QUERY_SERVICE);
+        addServlet(jsonAPIServer, Servlets.RUNNING_REQUESTS);
         addServlet(jsonAPIServer, Servlets.CONNECTOR);
         addServlet(jsonAPIServer, Servlets.SHUTDOWN);
         addServlet(jsonAPIServer, Servlets.VERSION);
@@ -260,6 +263,8 @@
             case Servlets.SQLPP_DDL:
                 return new DdlApiServlet(ctx, paths, ccExtensionManager.getSqlppCompilationProvider(),
                         getStatementExecutorFactory(), componentProvider);
+            case Servlets.RUNNING_REQUESTS:
+                return new QueryCancellationServlet(ctx, paths);
             case Servlets.QUERY_STATUS:
                 return new QueryStatusApiServlet(ctx, paths);
             case Servlets.QUERY_RESULT:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
new file mode 100644
index 0000000..5f40a85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.servlet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class QueryCancellationServletTest {
+
+    @Test
+    public void testDelete() throws Exception {
+        // Creates a query cancellation servlet.
+        QueryCancellationServlet cancellationServlet = new QueryCancellationServlet(new ConcurrentHashMap<>(),
+                new String[] { "/" });
+        // Adds mocked Hyracks client connection into the servlet context.
+        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
+        cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
+        // Adds a query context into the servlet context.
+        IStatementExecutorContext queryCtx = new StatementExecutorContext();
+        cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+
+        // Tests the case that query is not in the map.
+        IServletRequest mockRequest = mockRequest("1");
+        IServletResponse mockResponse = mock(IServletResponse.class);
+        cancellationServlet.handle(mockRequest, mockResponse);
+        verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
+
+        // Tests the case that query is in the map.
+        queryCtx.put("1", new JobId(1));
+        cancellationServlet.handle(mockRequest, mockResponse);
+        verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
+
+        // Tests the case the client_context_id is not provided.
+        mockRequest = mockRequest(null);
+        cancellationServlet.handle(mockRequest, mockResponse);
+        verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
+
+        // Tests the case that the job cancellation hit some exception from Hyracks.
+        queryCtx.put("2", new JobId(2));
+        Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
+        mockRequest = mockRequest("2");
+        cancellationServlet.handle(mockRequest, mockResponse);
+        verify(mockResponse, times(1)).setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+    }
+
+    private IServletRequest mockRequest(String clientContextId) {
+        IServletRequest mockRequest = mock(IServletRequest.class);
+        FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class);
+        when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest);
+        when(mockHttpRequest.method()).thenReturn(HttpMethod.DELETE);
+        if (clientContextId != null) {
+            when(mockRequest.getParameter("client_context_id")).thenReturn(clientContextId);
+        }
+        return mockRequest;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
new file mode 100644
index 0000000..97101ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.common;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.junit.Assert;
+
+public class CancellationTestExecutor extends TestExecutor {
+
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    @Override
+    public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
+            List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable)
+            throws Exception {
+        String clientContextId = UUID.randomUUID().toString();
+        if (cancellable) {
+            setParam(params, "client_context_id", clientContextId);
+        }
+        Callable<InputStream> query = () -> {
+            try {
+                return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, params, jsonEncoded, true);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw e;
+            }
+        };
+        Future<InputStream> future = executor.submit(query);
+        if (cancellable) {
+            Thread.sleep(20);
+            // Cancels the query request while the query is executing.
+            int rc = cancelQuery(getEndpoint(Servlets.RUNNING_REQUESTS), params);
+            Assert.assertTrue(rc == 200 || rc == 404);
+        }
+        InputStream inputStream = future.get();
+        // Since the current cancellation (i.e., abort) implementation is based on thread.interrupt and we did not
+        // track if all task threads are terminated or not, a timed wait here can reduce false alarms.
+        // TODO(yingyi): investigate if we need synchronized cancellation.
+        Thread.sleep(50);
+        return inputStream;
+    }
+
+    // Cancels a submitted query through the cancellation REST API.
+    private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception {
+        HttpUriRequest method = constructDeleteMethodUrl(uri, params);
+        HttpResponse response = executeHttpRequest(method);
+        return response.getStatusLine().getStatusCode();
+    }
+
+    // Constructs a HTTP DELETE request.
+    private HttpUriRequest constructDeleteMethodUrl(URI uri, List<TestCase.CompilationUnit.Parameter> otherParams) {
+        RequestBuilder builder = RequestBuilder.delete(uri);
+        for (TestCase.CompilationUnit.Parameter param : otherParams) {
+            builder.addParameter(param.getName(), param.getValue());
+        }
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 3531211..53a0f6c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.common;
 
 import java.io.InputStream;
-import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.logging.Logger;
@@ -60,6 +59,8 @@
             switch (field) {
                 case "requestID":
                     break;
+                case "clientContextID":
+                    break;
                 case "signature":
                     break;
                 case "status":
@@ -106,7 +107,7 @@
                     }
                     break;
                 default:
-                    throw new AsterixException(field + "unanticipated field");
+                    throw new AsterixException(field + " unanticipated field");
             }
         }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 7765572..608547c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -423,7 +423,6 @@
             String exceptionMsg;
             try {
                 // First try to parse the response for a JSON error response.
-
                 ObjectMapper om = new ObjectMapper();
                 JsonNode result = om.readTree(errorBody);
                 String[] errors = { result.get("error-code").asText(), result.get("summary").asText(),
@@ -457,6 +456,11 @@
 
     public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
             List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
+        return executeQueryService(str, fmt, uri, params, jsonEncoded, false);
+    }
+
+    protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
+            List<CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) throws Exception {
         setParam(params, "format", fmt.mimeType());
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", params)
                 : constructPostMethodUrl(str, uri, "statement", params);
@@ -830,7 +834,7 @@
                     }
                     final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
                     if (DELIVERY_IMMEDIATE.equals(delivery)) {
-                        resultStream = executeQueryService(statement, fmt, uri, params, true);
+                        resultStream = executeQueryService(statement, fmt, uri, params, true, true);
                         resultStream = ResultExtractor.extract(resultStream);
                     } else {
                         String handleVar = getHandleVariable(statement);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java
index 82f90ec..dd87455 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/APIExecutionTest.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionFullParallelismIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionFullParallelismIT.java
index 8df93ba..6cc5a9c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionFullParallelismIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionFullParallelismIT.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionLessParallelismIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionLessParallelismIT.java
index 2f3c395..dc03626 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionLessParallelismIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionLessParallelismIT.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionTest.java
index 4169a07..abc9f2f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/AqlExecutionTest.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
index 86a9639..17e88a6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
@@ -25,6 +25,7 @@
 import java.net.URL;
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -43,7 +44,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionFullParallelismTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionFullParallelismTest.java
index b7b4312..e428c93 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionFullParallelismTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionFullParallelismTest.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionLessParallelismTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionLessParallelismTest.java
index 9516d7d..346ae2f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionLessParallelismTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateExecutionLessParallelismTest.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 0e6be0f..7c2e472 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,6 +19,8 @@
 
 package org.apache.asterix.test.runtime;
 
+import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSON;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -35,7 +37,7 @@
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
+import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -52,12 +54,13 @@
     private static final boolean cleanupOnStart = true;
     private static final boolean cleanupOnStop = true;
     private static final List<String> badTestCases = new ArrayList<>();
-    private static final TestExecutor testExecutor = new TestExecutor();
+    private static TestExecutor testExecutor;
 
     private static TestLibrarian librarian;
     private static final int repeat = Integer.getInteger("test.repeat", 1);
 
-    public static void setUp(String configFile) throws Exception {
+    public static void setUp(String configFile, TestExecutor executor) throws Exception {
+        testExecutor = executor;
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
         List<ILibraryManager> libraryManagers = ExecutionTestUtil.setUp(cleanupOnStart, configFile);
@@ -70,16 +73,20 @@
     }
 
     public static void tearDown() throws Exception {
-        // Check whether there are leaked open run file handles.
-        checkRunFileLeaks();
-
-        TestLibrarian.removeLibraryDir();
-        ExecutionTestUtil.tearDown(cleanupOnStop);
-        ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
-        if (!badTestCases.isEmpty()) {
-            System.out.println("The following test cases left some data");
-            for (String testCase : badTestCases) {
-                System.out.println(testCase);
+        try {
+            // Check whether there are leaked open run file handles.
+            checkOpenRunFileLeaks();
+            // Check whether there are leaked threads.
+            checkThreadLeaks();
+        } finally {
+            TestLibrarian.removeLibraryDir();
+            ExecutionTestUtil.tearDown(cleanupOnStop);
+            ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+            if (!badTestCases.isEmpty()) {
+                System.out.println("The following test cases left some data");
+                for (String testCase : badTestCases) {
+                    System.out.println(testCase);
+                }
             }
         }
     }
@@ -128,7 +135,18 @@
         }
     }
 
-    private static void checkRunFileLeaks() throws IOException {
+    private static void checkThreadLeaks() throws IOException {
+        String threadDump = ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean());
+        // Currently we only do sanity check for threads used in the execution engine.
+        // Later we should check if there are leaked storage threads as well.
+        if (threadDump.contains("Operator") || threadDump.contains("SuperActivity") || threadDump
+                .contains("PipelinedPartition")) {
+            System.out.print(threadDump);
+            throw new AssertionError("There are leaked threads in the execution engine.");
+        }
+    }
+
+    private static void checkOpenRunFileLeaks() throws IOException {
         if (SystemUtils.IS_OS_WINDOWS) {
             return;
         }
@@ -142,7 +160,22 @@
                 .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf|wc -l" });
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
             int runFileCount = Integer.parseInt(reader.readLine().trim());
-            Assert.assertTrue(runFileCount == 0);
+            if (runFileCount != 0) {
+                System.out.print(takeDumpJSON(ManagementFactory.getThreadMXBean()));
+                outputLeakedOpenFiles(processId);
+                throw new AssertionError("There are " + runFileCount + " leaked run files.");
+            }
+        }
+    }
+
+    private static void outputLeakedOpenFiles(String processId) throws IOException {
+        Process process = Runtime.getRuntime()
+                .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf" });
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.err.println(line);
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionFullParallelismIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionFullParallelismIT.java
index f2372ed..3d93c36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionFullParallelismIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionFullParallelismIT.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionLessParallelismIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionLessParallelismIT.java
index c3cc058..53b068e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionLessParallelismIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionLessParallelismIT.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index f19ebbf..8ec1fe7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -20,6 +20,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -37,7 +38,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
     }
 
     @AfterClass
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
new file mode 100644
index 0000000..edf5741
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.CancellationTestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the SQL++ runtime tests with a cancellation request for each read-only query.
+ */
+@RunWith(Parameterized.class)
+public class SqlppExecutionWithCancellationTest {
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static int numCancelledQueries = 0;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new CancellationTestExecutor());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.err.println(numCancelledQueries + " queries have been cancelled during the test.");
+        try {
+            // Makes sure that there are queries that have indeed been cancelled during the test.
+            Assert.assertTrue(numCancelledQueries > 0);
+        } finally {
+            LangExecutionUtil.tearDown();
+        }
+    }
+
+    @Parameters(name = "SqlppExecutionWithCancellationTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public SqlppExecutionWithCancellationTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        try {
+            LangExecutionUtil.test(tcCtx);
+        } catch (Exception e) {
+            Throwable cause = getRootCause(e);
+            String errorMsg = cause.getMessage();
+            if (errorMsg.startsWith("HYR0025") // Expected, "HYR0025" means a user cancelled the query.
+                    || errorMsg.contains("\"status\": ") // Expected, "status" results for cancelled queries can change.
+                    || errorMsg.contains("reference count = 1") // not expected, but is a false alarm.
+                    || errorMsg.contains("pinned and file is being closed") // not expected, but maybe a false alarm.
+            // happens after the test query: big_object_load_20M.
+            ) {
+                numCancelledQueries++;
+            } else {
+                // Re-throw other kinds of exceptions.
+                throw e;
+            }
+        }
+    }
+
+    // Finds the root cause of Throwable.
+    private Throwable getRootCause(Throwable e) {
+        Throwable current = e;
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            Throwable nextCause = current.getCause();
+            current = cause;
+            cause = nextCause;
+        }
+        return current;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/types/any-object/any-object.2.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/types/any-object/any-object.2.query.aql
index e7208ca..7d619b4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/types/any-object/any-object.2.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/types/any-object/any-object.2.query.aql
@@ -18,4 +18,5 @@
  */
 
 for $x in dataset Metadata.Datatype
+where $x.DataverseName = "test"
 return $x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/types/any-object/any-object.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/types/any-object/any-object.2.query.sqlpp
index afaa5b1..cf72d38 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/types/any-object/any-object.2.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/types/any-object/any-object.2.query.sqlpp
@@ -17,4 +17,6 @@
  * under the License.
  */
 
-select element x from Metadata.Datatype as x;
+select element x
+from Metadata.Datatype as x
+where x.DataverseName = "test";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/types/any-object/any-object.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/types/any-object/any-object.2.adm
index 91df332..c8f7e91 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/types/any-object/any-object.2.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/types/any-object/any-object.2.adm
@@ -1,69 +1,3 @@
-{ "DataverseName": "Metadata", "DatatypeName": "AnyObject", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [  ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "CompactionPolicy", "FieldType": "string", "IsNullable": false }, { "FieldName": "Classname", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatatypeDataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatatypeName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetType", "FieldType": "string", "IsNullable": false }, { "FieldName": "GroupName", "FieldType": "string", "IsNullable": false }, { "FieldName": "CompactionPolicy", "FieldType": "string", "IsNullable": false }, { "FieldName": "CompactionPolicyProperties", "FieldType": "DatasetRecordType_CompactionPolicyProperties", "IsNullable": false }, { "FieldName": "InternalDetails", "FieldType": "DatasetRecordType_InternalDetails", "IsNullable": true }, { "FieldName": "ExternalDetails", "FieldType": "DatasetRecordType_ExternalDetails", "IsNullable": true }, { "FieldName": "Hints", "FieldType": "DatasetRecordType_Hints", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetId", "FieldType": "int32", "IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_CompactionPolicyProperties", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "DatasetRecordType_CompactionPolicyProperties_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_CompactionPolicyProperties_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Value", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_ExternalDetails", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DatasourceAdapter", "FieldType": "string", "IsNullable": false }, { "FieldName": "Properties", "FieldType": "DatasetRecordType_ExternalDetails_Properties", "IsNullable": false }, { "FieldName": "LastRefreshTime", "FieldType": "datetime", "IsNullable": false }, { "FieldName": "TransactionState", "FieldType": "int32", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_ExternalDetails_Properties", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "DatasetRecordType_ExternalDetails_Properties_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_ExternalDetails_Properties_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Value", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_Hints", "Derived": { "Tag": "UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "DatasetRecordType_Hints_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_Hints_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Value", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_InternalDetails", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "FileStructure", "FieldType": "string", "IsNullable": false }, { "FieldName": "PartitioningStrategy", "FieldType": "string", "IsNullable": false }, { "FieldName": "PartitioningKey", "FieldType": "DatasetRecordType_InternalDetails_PartitioningKey", "IsNullable": false }, { "FieldName": "PrimaryKey", "FieldType": "DatasetRecordType_InternalDetails_PrimaryKey", "IsNullable": false }, { "FieldName": "Autogenerated", "FieldType": "boolean", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_InternalDetails_PartitioningKey", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "DatasetRecordType_InternalDetails_PartitioningKey_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_InternalDetails_PartitioningKey_Item", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_InternalDetails_PrimaryKey", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "DatasetRecordType_InternalDetails_PrimaryKey_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType_InternalDetails_PrimaryKey_Item", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Classname", "FieldType": "string", "IsNullable": false }, { "FieldName": "Type", "FieldType": "string", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatatypeName", "FieldType": "string", "IsNullable": false }, { "FieldName": "Derived", "FieldType": "DatatypeRecordType_Derived", "IsNullable": true }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType_Derived", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Tag", "FieldType": "string", "IsNullable": false }, { "FieldName": "IsAnonymous", "FieldType": "boolean", "IsNullable": false }, { "FieldName": "Record", "FieldType": "DatatypeRecordType_Derived_Record", "IsNullable": true }, { "FieldName": "UnorderedList", "FieldType": "string", "IsNullable": true }, { "FieldName": "OrderedList", "FieldType": "string", "IsNullable": true } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType_Derived_Record", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "IsOpen", "FieldType": "boolean", "IsNullable": false }, { "FieldName": "Fields", "FieldType": "DatatypeRecordType_Derived_Record_Fields", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType_Derived_Record_Fields", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "DatatypeRecordType_Derived_Record_Fields_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType_Derived_Record_Fields_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "FieldName", "FieldType": "string", "IsNullable": false }, { "FieldName": "FieldType", "FieldType": "string", "IsNullable": false }, { "FieldName": "IsNullable", "FieldType": "boolean", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "DataverseRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DataFormat", "FieldType": "string", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", "IsNullable": false }, { "FieldName": "FileNumber", "FieldType": "int32", "IsNullable": false }, { "FieldName": "FileName", "FieldType": "string", "IsNullable": false }, { "FieldName": "FileSize", "FieldType": "int64", "IsNullable": false }, { "FieldName": "FileModTime", "FieldType": "datetime", "IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", "IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string", "IsNullable": false }, { "FieldName": "AppliedFunctions", "FieldType": "FeedConnectionRecordType_AppliedFunctions", "IsNullable": false }, { "FieldName": "PolicyName", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType_AppliedFunctions", "Derived": { "Tag": "UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "PolicyName", "FieldType": "string", "IsNullable": false }, { "FieldName": "Description", "FieldType": "string", "IsNullable": false }, { "FieldName": "Properties", "FieldType": "FeedPolicyRecordType_Properties", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType_Properties", "Derived": { "Tag": "UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "FeedPolicyRecordType_Properties_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType_Properties_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Value", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string", "IsNullable": false }, { "FieldName": "AdapterName", "FieldType": "string", "IsNullable": false }, { "FieldName": "AdapterConfiguration", "FieldType": "FeedRecordType_AdapterConfiguration", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedRecordType_AdapterConfiguration", "Derived": { "Tag": "UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "FeedRecordType_AdapterConfiguration_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedRecordType_AdapterConfiguration_Item", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Value", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FunctionRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Arity", "FieldType": "string", "IsNullable": false }, { "FieldName": "Params", "FieldType": "FunctionRecordType_Params", "IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string", "IsNullable": false }, { "FieldName": "Definition", "FieldType": "string", "IsNullable": false }, { "FieldName": "Language", "FieldType": "string", "IsNullable": false }, { "FieldName": "Kind", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FunctionRecordType_Params", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "IndexRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", "IsNullable": false }, { "FieldName": "IndexName", "FieldType": "string", "IsNullable": false }, { "FieldName": "IndexStructure", "FieldType": "string", "IsNullable": false }, { "FieldName": "SearchKey", "FieldType": "IndexRecordType_SearchKey", "IsNullable": false }, { "FieldName": "IsPrimary", "FieldType": "boolean", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "IndexRecordType_SearchKey", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "IndexRecordType_SearchKey_Item" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "IndexRecordType_SearchKey_Item", "Derived": { "Tag": "ORDEREDLIST", "IsAnonymous": true, "OrderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "LibraryRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string", "IsNullable": false }, { "FieldName": "Name", "FieldType": "string", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "GroupName", "FieldType": "string", "IsNullable": false }, { "FieldName": "NodeNames", "FieldType": "NodeGroupRecordType_NodeNames", "IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType_NodeNames", "Derived": { "Tag": "UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "string" }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "NodeRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "NodeName", "FieldType": "string", "IsNullable": false }, { "FieldName": "NumberOfCores", "FieldType": "int64", "IsNullable": false }, { "FieldName": "WorkingMemorySize", "FieldType": "int64", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "binary", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "boolean", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "circle", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "date", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "datetime", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "day-time-duration", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "double", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "duration", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "float", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "int16", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "int32", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "int64", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "int8", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "interval", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "line", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "missing", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "null", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "point", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "point3d", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "polygon", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "rectangle", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "shortwithouttypeinfo", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "string", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "time", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "uuid", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "year-month-duration", "Timestamp": "Mon Oct 10 14:53:55 PDT 2016" }
-{ "DataverseName": "test", "DatatypeName": "AnyObject", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [  ] } }, "Timestamp": "Mon Oct 10 15:00:08 PDT 2016" }
-{ "DataverseName": "test", "DatatypeName": "kv1", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "id", "FieldType": "int32", "IsNullable": false }, { "FieldName": "val", "FieldType": "AnyObject", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 15:00:08 PDT 2016" }
-{ "DataverseName": "test", "DatatypeName": "kv2", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "id", "FieldType": "int32", "IsNullable": false }, { "FieldName": "val", "FieldType": "AnyObject", "IsNullable": false } ] } }, "Timestamp": "Mon Oct 10 15:00:08 PDT 2016" }
+{ "DataverseName": "test", "DatatypeName": "AnyObject", "Derived": { "Tag": "RECORD", "IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [  ] } }, "Timestamp": "Fri Mar 10 17:36:46 PST 2017" }
+{ "DataverseName": "test", "DatatypeName": "kv1", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "id", "FieldType": "int32", "IsNullable": false }, { "FieldName": "val", "FieldType": "AnyObject", "IsNullable": false } ] } }, "Timestamp": "Fri Mar 10 17:36:46 PST 2017" }
+{ "DataverseName": "test", "DatatypeName": "kv2", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "id", "FieldType": "int32", "IsNullable": false }, { "FieldName": "val", "FieldType": "AnyObject", "IsNullable": false } ] } }, "Timestamp": "Fri Mar 10 17:36:46 PST 2017" }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
index 999eb34..8afae0d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
@@ -39,10 +39,6 @@
         return referenceCount;
     }
 
-    public void setReferenceCount(int referenceCount) {
-        this.referenceCount = referenceCount;
-    }
-
     public boolean isOpen() {
         return isOpen;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 5ffb334..6223f36 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -34,6 +34,7 @@
     public static final String CONNECTOR = "/connector";
     public static final String SHUTDOWN = "/admin/shutdown";
     public static final String VERSION = "/admin/version";
+    public static final String RUNNING_REQUESTS = "/admin/requests/running/*";
     public static final String CLUSTER_STATE = "/admin/cluster/*";
     public static final String CLUSTER_STATE_NODE_DETAIL = "/admin/cluster/node/*";
     public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*";
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 71beab4..e34e551 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -259,7 +259,6 @@
     public void run() {
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
-        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
         // the thread is not escaped from interruption.
         if (!addPendingThread(ct)) {
@@ -268,6 +267,7 @@
             ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
             return;
         }
+        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
             try {
                 operator.initialize();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 33b8980..594b2c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -68,11 +68,14 @@
                 throw new IllegalArgumentException();
         }
         raf = new RandomAccessFile(fileRef.getFile(), mode);
-        channel = raf.getChannel();
     }
 
     public void close() throws IOException {
+        if (raf == null) {
+            return;
+        }
         raf.close();
+        raf = null;
     }
 
     public FileReference getFileReference() {
@@ -80,10 +83,10 @@
     }
 
     public FileChannel getFileChannel() {
+        if (channel == null) {
+            channel = raf.getChannel();
+        }
         return channel;
     }
 
-    public void sync(boolean metadata) throws IOException {
-        channel.force(metadata);
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 5ccdaa8..d97a7b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -324,7 +324,7 @@
     @Override
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
         try {
-            ((FileHandle) fileHandle).sync(metadata);
+            ((FileHandle) fileHandle).getFileChannel().force(metadata);
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 137ef37..af6ef98 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -51,7 +51,7 @@
 
     private FileReference fRef;
 
-    private IFileHandle handle;
+    private IFileHandle writeHandle;
 
     private long size;
 
@@ -63,6 +63,8 @@
 
     private Level openCloseLevel = Level.FINE;
 
+    private Thread dataConsumerThread;
+
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
             TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
@@ -79,9 +81,13 @@
     }
 
     @Override
-    public void deallocate() {
-        if (fRef != null) {
-            fRef.delete();
+    public synchronized void deallocate() {
+        // Makes sure that the data consumer thread will not wait for anything further. Since the receiver side could
+        // have be interrupted already, the data consumer thread can potentially hang on writer.nextFrame(...)
+        // or writer.close(...).  Note that Task.abort(...) cannot interrupt the dataConsumerThread.
+        // If the query runs successfully, the dataConsumer thread should have been completed by this time.
+        if (dataConsumerThread != null) {
+            dataConsumerThread.interrupt();
         }
     }
 
@@ -90,64 +96,84 @@
         executor.execute(new Runnable() {
             @Override
             public void run() {
+                Thread thread = Thread.currentThread();
+                setDataConsumerThread(thread); // Sets the data consumer thread to the current thread.
+                String oldName = thread.getName();
                 try {
+                    thread.setName(MaterializingPipelinedPartition.class.getName() + pid);
+                    FileReference fRefCopy;
                     synchronized (MaterializingPipelinedPartition.this) {
-                        while (fRef == null && eos == false) {
+                        while (fRef == null && !eos && !failed) {
                             MaterializingPipelinedPartition.this.wait();
                         }
+                        fRefCopy = fRef;
                     }
-                    IFileHandle fh = fRef == null ? null
-                            : ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
-                                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    writer.open();
+                    IFileHandle readHandle = fRefCopy == null ? null
+                            : ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
+                                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
-                        writer.open();
+                        if (readHandle == null) {
+                            // Either fail() is called or close() is called with 0 tuples coming in.
+                            return;
+                        }
+                        long offset = 0;
+                        ByteBuffer buffer = ctx.allocateFrame();
+                        boolean done = false;
+                        while (!done) {
+                            boolean flush;
+                            boolean fail;
+                            synchronized (MaterializingPipelinedPartition.this) {
+                                while (offset >= size && !eos && !failed) {
+                                    MaterializingPipelinedPartition.this.wait();
+                                }
+                                flush = flushRequest;
+                                flushRequest = false; // Clears the flush flag.
+                                fail = failed;
+                                done = eos && offset >= size;
+                            }
+                            if (fail) {
+                                writer.fail(); // Exits the loop and the try-block if fail() is called.
+                                break;
+                            }
+                            if (!done) {
+                                buffer.clear();
+                                long readLen = ioManager.syncRead(readHandle, offset, buffer);
+                                if (readLen < buffer.capacity()) {
+                                    throw new HyracksDataException("Premature end of file");
+                                }
+                                offset += readLen;
+                                buffer.flip();
+                                writer.nextFrame(buffer);
+                            }
+                            if (flush) {
+                                writer.flush(); // Flushes the writer if flush() is called.
+                            }
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw e;
+                    } finally {
                         try {
-                            if (fh != null) {
-                                long offset = 0;
-                                ByteBuffer buffer = ctx.allocateFrame();
-                                boolean fail = false;
-                                boolean done = false;
-                                while (!fail && !done) {
-                                    synchronized (MaterializingPipelinedPartition.this) {
-                                        while (offset >= size && !eos && !failed) {
-                                            if (flushRequest) {
-                                                flushRequest = false;
-                                                writer.flush();
-                                            }
-                                            try {
-                                                MaterializingPipelinedPartition.this.wait();
-                                            } catch (InterruptedException e) {
-                                                throw new HyracksDataException(e);
-                                            }
-                                        }
-                                        flushRequest = false;
-                                        fail = failed;
-                                        done = eos && offset >= size;
-                                    }
-                                    if (fail) {
-                                        writer.fail();
-                                    } else if (!done) {
-                                        buffer.clear();
-                                        long readLen = ioManager.syncRead(fh, offset, buffer);
-                                        if (readLen < buffer.capacity()) {
-                                            throw new HyracksDataException("Premature end of file");
-                                        }
-                                        offset += readLen;
-                                        buffer.flip();
-                                        writer.nextFrame(buffer);
-                                    }
+                            writer.close();
+                        } finally {
+                            // Makes sure that the reader is always closed and the temp file is always deleted.
+                            try {
+                                if (readHandle != null) {
+                                    ioManager.close(readHandle);
+                                }
+                            } finally {
+                                if (fRef != null) {
+                                    fRef.delete();
                                 }
                             }
-                        } finally {
-                            writer.close();
-                        }
-                    } finally {
-                        if (fh != null) {
-                            ioManager.close(fh);
                         }
                     }
                 } catch (Exception e) {
-                    throw new RuntimeException(e);
+                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                } finally {
+                    thread.setName(oldName);
+                    setDataConsumerThread(null); // Sets back the data consumer thread to null.
                 }
             }
         });
@@ -172,7 +198,7 @@
     private void checkOrCreateFile() throws HyracksDataException {
         if (fRef == null) {
             fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
-            handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+            writeHandle = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                     IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         }
     }
@@ -180,7 +206,7 @@
     @Override
     public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         checkOrCreateFile();
-        size += ctx.getIOManager().syncWrite(handle, size, buffer);
+        size += ctx.getIOManager().syncWrite(writeHandle, size, buffer);
         notifyAll();
     }
 
@@ -195,12 +221,12 @@
         if (LOGGER.isLoggable(openCloseLevel)) {
             LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
         }
+        if (writeHandle != null) {
+            ctx.getIOManager().close(writeHandle);
+        }
         synchronized (this) {
             eos = true;
-            if (handle != null) {
-                ctx.getIOManager().close(handle);
-            }
-            handle = null;
+            writeHandle = null;
             notifyAll();
         }
     }
@@ -210,4 +236,10 @@
         flushRequest = true;
         notifyAll();
     }
+
+    // Sets the data consumer thread.
+    private synchronized void setDataConsumerThread(Thread thread) {
+        dataConsumerThread = thread;
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 84d2283..fd434d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -76,12 +76,12 @@
         manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
         failed = false;
         pendingConnection = true;
+        ensureConnected();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         if (!failed) {
-            ensureConnected();
             delegate.nextFrame(buffer);
         }
     }
@@ -113,7 +113,6 @@
     @Override
     public void close() throws HyracksDataException {
         if (!failed) {
-            ensureConnected();
             delegate.close();
         }
     }
@@ -121,7 +120,6 @@
     @Override
     public void flush() throws HyracksDataException {
         if (!failed) {
-            ensureConnected();
             delegate.flush();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 29d4e18..8f68e76 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -57,7 +57,14 @@
             @Override
             public void run() {
                 for (IPartition p : unregisteredPartitions) {
-                    p.deallocate();
+                    try {
+                        // Put deallocate in a try block to make sure that every IPartition is de-allocated.
+                        p.deallocate();
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.log(Level.WARNING, e.getMessage(), e);
+                        }
+                    }
                 }
             }
         });
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index f0bd318..ffee1a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -84,6 +84,9 @@
 
     @Override
     public void close() throws HyracksDataException {
+        if (handle == null) {
+            return; // Makes sure the close operation is idempotent.
+        }
         if (deleteAfterClose) {
             try {
                 ioManager.close(handle);
@@ -94,6 +97,7 @@
         } else {
             ioManager.close(handle);
         }
+        handle = null;
     }
 
     public long getFileSize() {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 5c642ba..1a9028d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -224,7 +224,7 @@
                 public void initialize() throws HyracksDataException {
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
                             new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, new VSizeFrame(ctx));
+                    state.writeOut(writer, new VSizeFrame(ctx), false);
                 }
 
             };
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index dbd3afa..189ce9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -74,19 +74,34 @@
     @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
+        if (!failed) {
+            boolean newFailure = false;
+            for (int i = 0; i < pWriters.length; ++i) {
+                try {
+                    if (isOpen[i] && allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
+                        appenders[i].write(pWriters[i], true);
+                    }
+                } catch (Exception e) {
+                    newFailure = true;
+                    closeException = wrapException(closeException, e);
+                    break;
+                }
+            }
+            if (newFailure) {
+                try {
+                    fail(); // Fail all writers if any new failure happens.
+                } catch (Exception e) {
+                    closeException = wrapException(closeException, e);
+                }
+            }
+        }
         for (int i = 0; i < pWriters.length; ++i) {
             if (isOpen[i]) {
-                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0 && !failed) {
-                    try {
-                        appenders[i].write(pWriters[i], true);
-                    } catch (Throwable th) {
-                        closeException = HyracksDataException.suppress(closeException, th);
-                    }
-                }
+                // The try-block make sures that every writer is closed.
                 try {
                     pWriters[i].close();
-                } catch (Throwable th) {
-                    closeException = HyracksDataException.suppress(closeException, th);
+                } catch (Exception e) {
+                    closeException = wrapException(closeException, e);
                 }
             }
         }
@@ -129,8 +144,8 @@
             if (isOpen[i]) {
                 try {
                     pWriters[i].fail();
-                } catch (Throwable th) {
-                    failException = HyracksDataException.suppress(failException, th);
+                } catch (Exception e) {
+                    failException = wrapException(failException, e);
                 }
             }
         }
@@ -147,4 +162,13 @@
             }
         }
     }
+
+    // Wraps the current encountered exception into the final exception.
+    private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) {
+        if (finalException == null) {
+            return HyracksDataException.create(currentException);
+        }
+        finalException.addSuppressed(currentException);
+        return finalException;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index e7da174..2c3e16e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -73,12 +73,16 @@
         out.nextFrame(buffer);
     }
 
-    public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
+    public void writeOut(IFrameWriter writer, IFrame frame, boolean failed) throws HyracksDataException {
         RunFileReader in = out.createReader();
+        writer.open();
         try {
-            writer.open();
+            if (failed) {
+                writer.fail();
+                return;
+            }
+            in.open();
             try {
-                in.open();
                 while (in.nextFrame(frame)) {
                     writer.nextFrame(frame.getBuffer());
                 }
@@ -89,9 +93,12 @@
             writer.fail();
             throw e;
         } finally {
-            writer.close();
-            if (numConsumers.decrementAndGet() == 0) {
-                out.getFileReference().delete();
+            try {
+                writer.close();
+            } finally {
+                if (numConsumers.decrementAndGet() == 0) {
+                    out.getFileReference().delete();
+                }
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 5922935..fd4b094 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -92,6 +92,7 @@
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private MaterializerTaskState state;
+                private boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -107,12 +108,13 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    failed = true;
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
                     state.close();
-                    state.writeOut(writer, new VSizeFrame(ctx));
+                    state.writeOut(writer, new VSizeFrame(ctx), failed);
                 }
             };
         }
@@ -171,7 +173,7 @@
                 public void initialize() throws HyracksDataException {
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
                             new TaskId(new ActivityId(getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, new VSizeFrame(ctx));
+                    state.writeOut(writer, new VSizeFrame(ctx), false);
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index f4158ac..c8f9268 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 
@@ -167,10 +168,22 @@
             if (finalWriter != null) {
                 finalWriter.fail();
             }
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } finally {
-            if (finalWriter != null) {
-                finalWriter.close();
+            try {
+                if (finalWriter != null) {
+                    finalWriter.close();
+                }
+            } finally {
+                for (RunFileReader reader : runs) {
+                    try {
+                        reader.close(); // close is idempotent.
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.log(Level.WARNING, e.getMessage(), e);
+                        }
+                    }
+                }
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 91d433c..0462909 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -94,8 +94,11 @@
             writer.fail();
             throw new HyracksDataException(e);
         } finally {
-            writer.close();
-            treeIndexHelper.close();
+            try {
+                writer.close();
+            } finally {
+                treeIndexHelper.close();
+            }
         }
     }
 }