Enable HTTP API processing on NCs

- Query/Status/Result are answered by NC nodes
- other HTTP requests are proxied to the CC node
- SessionConfig refactoring – split into config and output (SessionOutput)
- TestExecutor now can send http requests do multiple nodes (round robin)

Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1709
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 92c487b..19f0dcc 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,17 +18,24 @@
  */
 package org.apache.asterix.translator;
 
+import java.io.Serializable;
 import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 /**
@@ -54,6 +61,16 @@
         ASYNC
     }
 
+    class ResultMetadata implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets = new ArrayList<>();
+
+        public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
+            return resultSets;
+        }
+    }
+
     public static class Stats {
         private long count;
         private long size;
@@ -85,12 +102,14 @@
      *            A Hyracks dataset client object that is used to read the results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception;
+            ResultMetadata outMetadata, Stats stats) throws Exception;
 
     /**
      * Compiles and execute a list of statements, with passing in client context id and context.
@@ -101,6 +120,8 @@
      *            A Hyracks dataset client object that is used to read the results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @param clientContextId
@@ -110,7 +131,8 @@
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception;
+            ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+            throws Exception;
 
     /**
      * rewrites and compiles query into a hyracks job specifications
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
index 23365de..b244c0c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
@@ -37,14 +37,14 @@
      *
      * @param statements
      *            Statements to execute
-     * @param conf
-     *            request configuration
+     * @param output
+     *            output and request configuration
      * @param compilationProvider
      *            provides query language related components
      * @param storageComponentProvider
      *            provides storage related components
      * @return an implementation of {@code IStatementExecutor} thaxt is used to execute the passed list of statements
      */
-    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider);
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index a637e2f..cfd4e87 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -18,13 +18,10 @@
  */
 package org.apache.asterix.translator;
 
-import java.io.PrintWriter;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
-
 /**
  * SessionConfig captures several different parameters for controlling
  * the execution of an APIFramework call.
@@ -38,8 +35,10 @@
  * execution output - LOSSLESS_JSON, CSV, etc.
  * <li>It allows you to specify output format-specific parameters.
  */
+public class SessionConfig implements Serializable {
 
-public class SessionConfig {
+    private static final long serialVersionUID = 1L;
+
     /**
      * Used to specify the output format for the primary execution.
      */
@@ -105,53 +104,25 @@
      */
     public static final String FORMAT_QUOTE_RECORD = "quote-record";
 
-    @FunctionalInterface
-    public interface ResultDecorator {
-        AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
-    }
-
-    @FunctionalInterface
-    public interface ResultAppender {
-        AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
-    }
+    // Output format.
+    private final OutputFormat fmt;
 
     // Standard execution flags.
     private final boolean executeQuery;
     private final boolean generateJobSpec;
     private final boolean optimize;
 
-    // Output path for primary execution.
-    private final PrintWriter out;
-
-    // Output format.
-    private final OutputFormat fmt;
-
-    private final ResultDecorator preResultDecorator;
-    private final ResultDecorator postResultDecorator;
-    private final ResultAppender handleAppender;
-    private final ResultAppender statusAppender;
-
     // Flags.
     private final Map<String, Boolean> flags;
 
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
-        this(out, fmt, preResultDecorator, postResultDecorator, handleAppender, statusAppender,
-                true, true, true);
-    }
-
-    public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
-            boolean generateJobSpec) {
-        this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec);
+    public SessionConfig(OutputFormat fmt) {
+        this(fmt, true, true, true);
     }
 
     /**
      * Create a SessionConfig object with all optional values set to defaults:
      * - All format flags set to "false".
      * - All out-of-band outputs set to "false".
-     *
-     * @param out
-     *            PrintWriter for execution output.
      * @param fmt
      *            Output format for execution output.
      * @param optimize
@@ -160,17 +131,9 @@
      *            Whether to execute the query or not.
      * @param generateJobSpec
      *            Whether to generate the Hyracks job specification (if
-     *            false, job cannot be executed).
      */
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender,
-            boolean optimize, boolean executeQuery, boolean generateJobSpec) {
-        this.out = out;
+    public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
         this.fmt = fmt;
-        this.preResultDecorator = preResultDecorator;
-        this.postResultDecorator = postResultDecorator;
-        this.handleAppender = handleAppender;
-        this.statusAppender = statusAppender;
         this.optimize = optimize;
         this.executeQuery = executeQuery;
         this.generateJobSpec = generateJobSpec;
@@ -178,35 +141,12 @@
     }
 
     /**
-     * Retrieve the PrintWriter to produce output to.
-     */
-    public PrintWriter out() {
-        return this.out;
-    }
-
-    /**
      * Retrieve the OutputFormat for this execution.
      */
     public OutputFormat fmt() {
         return this.fmt;
     }
 
-    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
-        return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
-        return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
-        return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
-    }
-
-    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
-        return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
-    }
-
     /**
      * Retrieve the value of the "execute query" flag.
      */
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
new file mode 100644
index 0000000..b559df8
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import java.io.PrintWriter;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+public class SessionOutput {
+    private final SessionConfig config;
+
+    // Output path for primary execution.
+    private final PrintWriter out;
+
+    private final SessionOutput.ResultDecorator preResultDecorator;
+    private final SessionOutput.ResultDecorator postResultDecorator;
+    private final SessionOutput.ResultAppender handleAppender;
+    private final SessionOutput.ResultAppender statusAppender;
+
+    public SessionOutput(SessionConfig config, PrintWriter out) {
+        this(config, out, null, null, null, null);
+    }
+
+    public SessionOutput(SessionConfig config, PrintWriter out, ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
+        this.config = config;
+        this.out = out;
+        this.preResultDecorator = preResultDecorator;
+        this.postResultDecorator = postResultDecorator;
+        this.handleAppender = handleAppender;
+        this.statusAppender = statusAppender;
+    }
+
+    /**
+     * Retrieve the PrintWriter to produce output to.
+     */
+    public PrintWriter out() {
+        return this.out;
+    }
+
+    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
+        return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
+    }
+
+    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
+        return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
+    }
+
+    public SessionConfig config() {
+        return config;
+    }
+
+    @FunctionalInterface
+    public interface ResultDecorator {
+        AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
+    }
+
+    @FunctionalInterface
+    public interface ResultAppender {
+        AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b12935d..583302b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -67,6 +67,7 @@
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.utils.ResourceUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -152,31 +153,33 @@
         }
     }
 
-    private void printPlanPrefix(SessionConfig conf, String planName) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>" + planName + ":</h4>");
-            conf.out().println("<pre>");
+    private void printPlanPrefix(SessionOutput output, String planName) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("<h4>" + planName + ":</h4>");
+            output.out().println("<pre>");
         } else {
-            conf.out().println("----------" + planName + ":");
+            output.out().println("----------" + planName + ":");
         }
     }
 
-    private void printPlanPostfix(SessionConfig conf) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+    private void printPlanPostfix(SessionOutput output) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("</pre>");
         }
     }
 
     public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions,
-            MetadataProvider metadataProvider, IReturningStatement q, SessionConfig conf) throws CompilationException {
+            MetadataProvider metadataProvider, IReturningStatement q, SessionOutput output)
+            throws CompilationException {
         if (q == null) {
             return null;
         }
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) {
-            conf.out().println();
-            printPlanPrefix(conf, "Expression tree");
-            q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
-            printPlanPostfix(conf);
+            output.out().println();
+            printPlanPrefix(output, "Expression tree");
+            q.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+            printPlanPostfix(output);
         }
         IQueryRewriter rw = rewriterFactory.createQueryRewriter();
         rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter()));
@@ -184,17 +187,18 @@
     }
 
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
-            Query rwQ, int varCounter, String outputDatasetName, SessionConfig conf, ICompiledDmlStatement statement)
+            Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
             throws AlgebricksException, RemoteException, ACIDException {
 
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Rewritten expression tree");
+            printPlanPrefix(output, "Rewritten expression tree");
             if (rwQ != null) {
-                rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
+                rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
 
         org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
@@ -211,14 +215,14 @@
         }
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Logical plan");
+            printPlanPrefix(output, "Logical plan");
             if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
-                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(output.out());
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
@@ -264,15 +268,16 @@
             if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
                 if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
                     // For Optimizer tests.
-                    AlgebricksAppendable buffer = new AlgebricksAppendable(conf.out());
+                    AlgebricksAppendable buffer = new AlgebricksAppendable(output.out());
                     PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
                 } else {
-                    printPlanPrefix(conf, "Optimized logical plan");
+                    printPlanPrefix(output, "Optimized logical plan");
                     if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
-                        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+                        LogicalOperatorPrettyPrintVisitor pvisitor =
+                                new LogicalOperatorPrettyPrintVisitor(output.out());
                         PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
                     }
-                    printPlanPostfix(conf);
+                    printPlanPostfix(output);
                 }
             }
         }
@@ -280,7 +285,7 @@
             try {
                 LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
-                ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), conf,
+                ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), output,
                         new Stats(), null);
                 return null;
             } catch (IOException e) {
@@ -336,17 +341,17 @@
         }
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
-            printPlanPrefix(conf, "Hyracks job");
+            printPlanPrefix(output, "Hyracks job");
             if (rwQ != null) {
                 try {
-                    conf.out().println(
+                    output.out().println(
                             new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
                 } catch (IOException e) {
                     throw new AlgebricksException(e);
                 }
-                conf.out().println(spec.getUserConstraints());
+                output.out().println(spec.getUserConstraints());
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         return spec;
     }
@@ -459,9 +464,7 @@
 
     // Gets the frame limit.
     private static int getFrameLimit(String parameterName, String parameter, long memBudgetInConfiguration,
-            int frameSize,
-            int minFrameLimit)
-            throws AlgebricksException {
+            int frameSize, int minFrameLimit) throws AlgebricksException {
         IOptionType<Long> longBytePropertyInterpreter = OptionTypes.LONG_BYTE_UNIT;
         long memBudget = parameter == null ? memBudgetInConfiguration : longBytePropertyInterpreter.parse(parameter);
         int frameLimit = (int) (memBudget / frameSize);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a9c4b39..a4e72f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -35,7 +35,7 @@
 import org.apache.hyracks.http.server.AbstractServlet;
 
 public class AbstractQueryApiServlet extends AbstractServlet {
-    protected final ICcApplicationContext appCtx;
+    protected final IApplicationContext appCtx;
 
     public enum ResultFields {
         REQUEST_ID("requestID"),
@@ -93,7 +93,7 @@
         }
     }
 
-    AbstractQueryApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
+    AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
         this.appCtx = appCtx;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 3384332..7874aa3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -137,19 +138,20 @@
             }
             IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
-            SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+            SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true);
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
             sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
             sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray));
             sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam),
                     isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, out);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
             translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
-                    new IStatementExecutor.Stats());
+                    null, new IStatementExecutor.Stats());
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
new file mode 100644
index 0000000..2b70685
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
+import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Query service servlet that can run on NC nodes.
+ * Delegates query execution to CC, then serves the result.
+ */
+public class NCQueryServiceServlet extends QueryServiceServlet {
+    public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+            ILangExtension.Language queryLanguage) {
+        super(ctx, paths, appCtx, queryLanguage, null, null, null);
+    }
+
+    @Override
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+            IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+            String handleUrl, long[] outExecStartEnd) throws Exception {
+        // Running on NC -> send 'execute' message to CC
+        INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
+        INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
+        IStatementExecutor.ResultDelivery ccDelivery = delivery == IStatementExecutor.ResultDelivery.IMMEDIATE
+                ? IStatementExecutor.ResultDelivery.DEFERRED : delivery;
+        ExecuteStatementResponseMessage responseMsg;
+        MessageFuture responseFuture = ncMb.registerMessageFuture();
+        try {
+            ExecuteStatementRequestMessage requestMsg =
+                    new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
+                            statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl);
+            outExecStartEnd[0] = System.nanoTime();
+            ncMb.sendMessageToCC(requestMsg);
+            responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
+                    ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS);
+            outExecStartEnd[1] = System.nanoTime();
+        } finally {
+            ncMb.deregisterMessageFuture(responseFuture.getFutureId());
+        }
+
+        Throwable err = responseMsg.getError();
+        if (err != null) {
+            if (err instanceof Error) {
+                throw (Error) err;
+            } else if (err instanceof Exception) {
+                throw (Exception) err;
+            } else {
+                throw new Exception(err.toString(), err);
+            }
+        }
+
+        IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
+        if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
+            for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
+                ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+                ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
+            }
+        } else {
+            sessionOutput.out().append(responseMsg.getResult());
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 401f55e..42e23ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,9 +25,9 @@
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -41,7 +41,7 @@
 public class QueryResultApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName());
 
-    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 
@@ -94,8 +94,8 @@
             // way to send the same OutputFormat value here as was
             // originally determined there. Need to save this value on
             // some object that we can obtain here.
-            SessionConfig sessionConfig = RestApiServlet.initResponse(request, response);
-            ResultUtil.printResults(appCtx, resultReader, sessionConfig, new Stats(), null);
+            SessionOutput sessionOutput = RestApiServlet.initResponse(request, response);
+            ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
             if (ErrorCode.NO_RESULTSET == errorCode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 20bffc4..c45f24a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,8 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -46,8 +47,8 @@
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -66,19 +67,23 @@
 
 public class QueryServiceServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName());
+    protected final ILangExtension.Language queryLanguage;
     private final ILangCompilationProvider compilationProvider;
     private final IStatementExecutorFactory statementExecutorFactory;
     private final IStorageComponentProvider componentProvider;
-    private final IStatementExecutorContext queryCtx = new StatementExecutorContext();
+    private final IStatementExecutorContext queryCtx;
+    protected final IServiceContext serviceCtx;
 
-    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
-            ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
-            IStorageComponentProvider componentProvider) {
+    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+            ILangExtension.Language queryLanguage, ILangCompilationProvider compilationProvider,
+            IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) {
         super(appCtx, ctx, paths);
+        this.queryLanguage = queryLanguage;
         this.compilationProvider = compilationProvider;
         this.statementExecutorFactory = statementExecutorFactory;
         this.componentProvider = componentProvider;
-        ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+        this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+        this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
     }
 
     @Override
@@ -235,40 +240,22 @@
         return SessionConfig.OutputFormat.CLEAN_JSON;
     }
 
-    private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl,
+    private static SessionOutput createSessionOutput(RequestParameters param, String handleUrl,
             PrintWriter resultWriter) {
-        SessionConfig.ResultDecorator resultPrefix = new SessionConfig.ResultDecorator() {
-            int resultNo = -1;
-
-            @Override
-            public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
-                app.append("\t\"");
-                app.append(ResultFields.RESULTS.str());
-                if (resultNo >= 0) {
-                    app.append('-').append(String.valueOf(resultNo));
-                }
-                ++resultNo;
-                app.append("\": ");
-                return app;
-            }
-        };
-
-        SessionConfig.ResultDecorator resultPostfix = app -> app.append("\t,\n");
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("\t\"")
-                .append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl).append(handle).append("\",\n");
-        SessionConfig.ResultAppender appendStatus = (app, status) -> app.append("\t\"")
-                .append(ResultFields.STATUS.str()).append("\": \"").append(status).append("\",\n");
+        SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+        SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+        SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+        SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
 
         SessionConfig.OutputFormat format = getFormat(param.format);
-        SessionConfig sessionConfig =
-                new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, appendHandle, appendStatus);
+        SessionConfig sessionConfig = new SessionConfig(format);
         sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
         sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
         sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
                 format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON);
         sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV
                 && "present".equals(getParameterValue(param.format, Attribute.HEADER.str())));
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, resultWriter, resultPrefix, resultPostfix, appendHandle, appendStatus);
     }
 
     private static void printClientContextID(PrintWriter pw, RequestParameters params) {
@@ -406,13 +393,13 @@
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
-        SessionConfig sessionConfig = createSessionConfig(param, handleUrl, resultWriter);
+        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter);
+        SessionConfig sessionConfig = sessionOutput.config();
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
         HttpResponseStatus status = HttpResponseStatus.OK;
         Stats stats = new Stats();
-        long execStart = -1;
-        long execEnd = -1;
+        long[] execStartEnd = new long[] { -1, -1 };
 
         resultWriter.print("{\n");
         printRequestId(resultWriter);
@@ -420,46 +407,33 @@
         printSignature(resultWriter);
         printType(resultWriter, sessionConfig);
         try {
-            final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
-            if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
-                // using a plain IllegalStateException here to get into the right catch clause for a 500
-                throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
-            }
             if (param.statement == null || param.statement.isEmpty()) {
                 throw new AsterixException("Empty request, no statement provided");
             }
-            IParser parser = compilationProvider.getParserFactory().createParser(param.statement + ";");
-            List<Statement> statements = parser.parse();
-            MetadataManager.INSTANCE.init();
-            IStatementExecutor translator =
-                    statementExecutorFactory.create(appCtx, statements, sessionConfig, compilationProvider,
-                            componentProvider);
-            execStart = System.nanoTime();
-            translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats,
-                    param.clientContextID, queryCtx);
-            execEnd = System.nanoTime();
+            String statementsText = param.statement + ";";
+            executeStatement(statementsText, sessionOutput, delivery, stats, param, handleUrl, execStartEnd);
             if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
-                ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+                ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             ResultUtil.printError(resultWriter, pe);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.BAD_REQUEST;
         } catch (Exception e) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.toString(), e);
             ResultUtil.printError(resultWriter, e);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
         } finally {
-            if (execStart == -1) {
-                execEnd = -1;
-            } else if (execEnd == -1) {
-                execEnd = System.nanoTime();
+            if (execStartEnd[0] == -1) {
+                execStartEnd[1] = -1;
+            } else if (execStartEnd[1] == -1) {
+                execStartEnd[1] = System.nanoTime();
             }
         }
-        printMetrics(resultWriter, System.nanoTime() - elapsedStart, execEnd - execStart, stats.getCount(),
-                stats.getSize());
+        printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
+                stats.getCount(), stats.getSize());
         resultWriter.print("}\n");
         resultWriter.flush();
         String result = stringWriter.toString();
@@ -472,4 +446,23 @@
             LOGGER.warning("Error flushing output writer");
         }
     }
+
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
+            IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd)
+            throws Exception {
+        IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            // using a plain IllegalStateException here to get into the right catch clause for a 500
+            throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+        }
+        IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+        List<Statement> statements = parser.parse();
+        MetadataManager.INSTANCE.init();
+        IStatementExecutor translator = statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
+                sessionOutput, compilationProvider, componentProvider);
+        outExecStartEnd[0] = System.nanoTime();
+        translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, null, stats,
+                param.clientContextID, queryCtx);
+        outExecStartEnd[1] = System.nanoTime();
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index d0c574e..71dddc0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -29,7 +29,7 @@
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -41,7 +41,7 @@
 public class QueryStatusApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryStatusApiServlet.class.getName());
 
-    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index e339ba9..6b1e408 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -84,7 +85,7 @@
      * SessionConfig with the appropriate output writer and output-format
      * based on the Accept: header and other servlet parameters.
      */
-    static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException {
+    static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException {
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
         // CLEAN_JSON output is the default; most generally useful for a
         // programmatic HTTP API
@@ -114,9 +115,9 @@
             format = OutputFormat.LOSSLESS_JSON;
         }
 
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
+        SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
                 .append("\":" + " \"").append(handle).append("\" }");
-        SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, appendHandle, null);
+        SessionConfig sessionConfig = new SessionConfig(format);
 
         // If it's JSON or ADM, check for the "wrapper-array" flag. Default is
         // "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -152,7 +153,7 @@
             default:
                 throw new IOException("Unknown format " + format);
         }
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null);
     }
 
     @Override
@@ -171,9 +172,9 @@
             // enable cross-origin resource sharing
             response.setHeader("Access-Control-Allow-Origin", "*");
             response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
-            SessionConfig sessionConfig = initResponse(request, response);
+            SessionOutput sessionOutput = initResponse(request, response);
             QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
-            doHandle(response, query, sessionConfig, resultDelivery);
+            doHandle(response, query, sessionOutput, resultDelivery);
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +182,7 @@
         }
     }
 
-    private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig,
+    private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput,
             ResultDelivery resultDelivery) throws JsonProcessingException {
         try {
             response.setStatus(HttpResponseStatus.OK);
@@ -201,20 +202,20 @@
             List<Statement> aqlStatements = parser.parse();
             validate(aqlStatements);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
-            translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats());
+            translator.compileAndExecute(hcc, hds, resultDelivery, null, new IStatementExecutor.Stats());
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query);
             ObjectNode errorResp =
                     ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe));
-            sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp));
+            sessionOutput.out().write(new ObjectMapper().writeValueAsString(errorResp));
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-            ResultUtil.apiErrorHandler(sessionConfig.out(), e);
+            ResultUtil.apiErrorHandler(sessionOutput.out(), e);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index fe6fa89..3bcd670 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -36,10 +36,10 @@
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultPrinter;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.http.ParseException;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -77,29 +77,29 @@
         return escaped;
     }
 
-    public static void printResults(ICcApplicationContext appCtx, ResultReader resultReader, SessionConfig conf,
+    public static void printResults(IApplicationContext appCtx, ResultReader resultReader, SessionOutput output,
             Stats stats, ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader);
+        new ResultPrinter(appCtx, output, stats, recordType).print(resultReader);
     }
 
-    public static void printResults(ICcApplicationContext appCtx, String record, SessionConfig conf, Stats stats,
+    public static void printResults(IApplicationContext appCtx, String record, SessionOutput output, Stats stats,
             ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(record);
+        new ResultPrinter(appCtx, output, stats, recordType).print(record);
     }
 
-    public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException {
+    public static void printResultHandle(SessionOutput output, ResultHandle handle) throws HyracksDataException {
         try {
-            final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
-            conf.appendHandle(app, handle.toString());
+            final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+            output.appendHandle(app, handle.toString());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing handle", e);
         }
     }
 
-    public static void printStatus(SessionConfig conf, AbstractQueryApiServlet.ResultStatus rs) {
+    public static void printStatus(SessionOutput output, AbstractQueryApiServlet.ResultStatus rs) {
         try {
-            final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
-            conf.appendStatus(app, rs.str());
+            final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+            output.appendStatus(app, rs.str());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing status", e);
         }
@@ -318,4 +318,35 @@
         return errorTemplate;
     }
 
+    public static SessionOutput.ResultDecorator createPreResultDecorator() {
+        return new SessionOutput.ResultDecorator() {
+            int resultNo = -1;
+
+            @Override
+            public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
+                app.append("\t\"");
+                app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str());
+                if (resultNo >= 0) {
+                    app.append('-').append(String.valueOf(resultNo));
+                }
+                ++resultNo;
+                app.append("\": ");
+                return app;
+            }
+        };
+    }
+
+    public static SessionOutput.ResultDecorator createPostResultDecorator() {
+        return app -> app.append("\t,\n");
+    }
+
+    public static SessionOutput.ResultAppender createResultHandleAppender(String handleUrl) {
+        return (app, handle) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str())
+                .append("\": \"").append(handleUrl).append(handle).append("\",\n");
+    }
+
+    public static SessionOutput.ResultAppender createResultStatusAppender() {
+        return (app, status) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str())
+                .append("\": \"").append(status).append("\",\n");
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index a9d4e22..b815d76 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -22,8 +22,9 @@
     public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
     public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
     public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO";
-    public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE_ATTR";
+    public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE";
     public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
+    public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT";
 
     private ServletConstants() {
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index ecf2c53..a9d24b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -99,16 +100,17 @@
         List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
-        SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, optimize, true, generateBinaryRuntime);
+        SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime);
         conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printJob);
         if (printPhysicalOpsOnly) {
             conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
         }
+        SessionOutput output = new SessionOutput(conf, writer);
 
-        IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, conf, compilationProvider,
+        IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
         translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE,
-                new IStatementExecutor.Stats());
+                null, new IStatementExecutor.Stats());
         writer.flush();
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 6c6f2af..0c6b2cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -110,11 +110,11 @@
         return statementExecutorFactory;
     }
 
-    public ILangCompilationProvider getAqlCompilationProvider() {
-        return aqlCompilationProvider;
-    }
-
-    public ILangCompilationProvider getSqlppCompilationProvider() {
-        return sqlppCompilationProvider;
+    public ILangCompilationProvider getCompilationProvider(Language lang) {
+        switch (lang) {
+            case AQL: return aqlCompilationProvider;
+            case SQLPP: return sqlppCompilationProvider;
+            default: throw new IllegalArgumentException(String.valueOf(lang));
+        }
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
new file mode 100644
index 0000000..defa180
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+public final class ExecuteStatementRequestMessage implements ICcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
+
+    private final String requestNodeId;
+
+    private final long requestMessageId;
+
+    private final ILangExtension.Language lang;
+
+    private final String statementsText;
+
+    private final SessionConfig sessionConfig;
+
+    private final IStatementExecutor.ResultDelivery delivery;
+
+    private final String clientContextID;
+
+    private final String handleUrl;
+
+    public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
+            String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery,
+            String clientContextID, String handleUrl) {
+        this.requestNodeId = requestNodeId;
+        this.requestMessageId = requestMessageId;
+        this.lang = lang;
+        this.statementsText = statementsText;
+        this.sessionConfig = sessionConfig;
+        this.delivery = delivery;
+        this.clientContextID = clientContextID;
+        this.handleUrl = handleUrl;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException, InterruptedException {
+        ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+        ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
+        CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+        CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+        CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
+        ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
+        IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
+        IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
+        IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
+
+        ccSrv.getExecutor().submit(() -> {
+            ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+
+            try {
+                final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+                if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+                    throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+                }
+
+                IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+                List<Statement> statements = parser.parse();
+
+                StringWriter outWriter = new StringWriter(256);
+                PrintWriter outPrinter = new PrintWriter(outWriter);
+                SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+                SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+                SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+                SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
+                SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
+                        appendHandle, appendStatus);
+
+                IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
+
+                MetadataManager.INSTANCE.init();
+                IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                        compilationProvider, storageComponentProvider);
+                translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata,
+                        new IStatementExecutor.Stats(), clientContextID, statementExecutorContext);
+
+                outPrinter.close();
+                responseMsg.setResult(outWriter.toString());
+                responseMsg.setMetadata(outMetadata);
+            } catch (TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+                responseMsg.setError(pe);
+            } catch (AsterixException pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+                responseMsg.setError(new AsterixException(pe.getMessage()));
+            } catch (Exception e) {
+                String estr = e.toString();
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, estr, e);
+                responseMsg.setError(new Exception(estr));
+            }
+
+            try {
+                messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, e.toString(), e);
+            }
+        });
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
+                statementsText);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
new file mode 100644
index 0000000..4f9aa0c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
+
+    private final long requestMessageId;
+
+    private String result;
+
+    private IStatementExecutor.ResultMetadata metadata;
+
+    private Throwable error;
+
+    public ExecuteStatementResponseMessage(long requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(requestMessageId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public Throwable getError() {
+        return error;
+    }
+
+    public void setError(Throwable error) {
+        this.error = error;
+    }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
+
+    public IStatementExecutor.ResultMetadata getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(IStatementExecutor.ResultMetadata metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId,
+                result != null ? result.length() : 0);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 7ed3aef..452d13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -24,10 +24,11 @@
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
 import org.apache.hyracks.api.comm.IFrame;
@@ -47,6 +48,7 @@
 
     private final FrameManager resultDisplayFrameMgr;
 
+    private final SessionOutput output;
     private final SessionConfig conf;
     private final Stats stats;
     private final ARecordType recordType;
@@ -62,8 +64,9 @@
     private ObjectMapper om;
     private ObjectWriter ow;
 
-    public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf, Stats stats, ARecordType recordType) {
-        this.conf = conf;
+    public ResultPrinter(IApplicationContext appCtx, SessionOutput output, Stats stats, ARecordType recordType) {
+        this.output = output;
+        this.conf = output.config();
         this.stats = stats;
         this.recordType = recordType;
         this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
@@ -112,18 +115,18 @@
         // If we're outputting CSV with a header, the HTML header was already
         // output by displayCSVHeader(), so skip it here
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>Results:</h4>");
-            conf.out().println("<pre class=\"result-content\">");
+            output.out().println("<h4>Results:</h4>");
+            output.out().println("<pre class=\"result-content\">");
         }
 
         try {
-            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+            output.resultPrefix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
 
         if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
-            conf.out().print("[ ");
+            output.out().print("[ ");
             wrapArray = true;
         }
 
@@ -134,29 +137,29 @@
             if (quoteRecord) {
                 StringWriter sw = new StringWriter();
                 appendCSVHeader(sw, recordType);
-                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
-                conf.out().print("\n");
+                output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+                output.out().print("\n");
                 notFirst = true;
             } else {
-                appendCSVHeader(conf.out(), recordType);
+                appendCSVHeader(output.out(), recordType);
             }
         }
     }
 
     private void printPostfix() throws HyracksDataException {
-        conf.out().flush();
+        output.out().flush();
         if (wrapArray) {
-            conf.out().println(" ]");
+            output.out().println(" ]");
         }
         try {
-            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+            output.resultPostfix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+            output.out().println("</pre>");
         }
-        conf.out().flush();
+        output.out().flush();
     }
 
     private void displayRecord(String result) throws HyracksDataException {
@@ -177,7 +180,7 @@
             // TODO(tillw): this is inefficient as well
             record = JSONUtil.quoteAndEscape(record);
         }
-        conf.out().print(record);
+        output.out().print(record);
         stats.setCount(stats.getCount() + 1);
         // TODO(tillw) fix this approximation
         stats.setSize(stats.getSize() + record.length());
@@ -211,7 +214,7 @@
                 }
                 String result = new String(frameBytes, start, length, UTF_8);
                 if (wrapArray && notFirst) {
-                    conf.out().print(", ");
+                    output.out().print(", ");
                 }
                 notFirst = true;
                 displayRecord(result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index e2f17ac..00fb1b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 
 public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
@@ -48,9 +48,9 @@
     }
 
     @Override
-    public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+    public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
-        return new QueryTranslator(appCtx, statements, conf, compilationProvider, storageComponentProvider,
+        return new QueryTranslator(appCtx, statements, output, compilationProvider, storageComponentProvider,
                 executorService);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d37de0d..e0648ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -161,6 +161,7 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
@@ -170,6 +171,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -203,6 +205,7 @@
     public static final boolean IS_DEBUG_MODE = false;// true
     protected final List<Statement> statements;
     protected final ICcApplicationContext appCtx;
+    protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
@@ -211,12 +214,13 @@
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
             ExecutorService executorService) {
         this.appCtx = appCtx;
         this.statements = statements;
-        this.sessionConfig = conf;
+        this.sessionOutput = output;
+        this.sessionConfig = output.config();
         this.componentProvider = componentProvider;
         declaredFunctions = getDeclaredFunctions(statements);
         apiFramework = new APIFramework(compliationProvider);
@@ -241,13 +245,14 @@
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
+            ResultMetadata outMetadata, Stats stats) throws Exception {
+        compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, null);
     }
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception {
+            ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+            throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -262,7 +267,7 @@
         try {
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-                    sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
+                    sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
                 }
                 validateOperation(appCtx, activeDataverse, stmt);
                 rewriteStatement(stmt); // Rewrite the statement's AST.
@@ -324,8 +329,8 @@
                             metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
                                     || resultDelivery == ResultDelivery.DEFERRED);
                         }
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false,
-                                clientContextId, ctx);
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata,
+                                stats, false, clientContextId, ctx);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, false);
@@ -358,8 +363,8 @@
                         metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
-                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats, clientContextId,
-                                ctx);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats,
+                                clientContextId, ctx);
                         break;
                     case Statement.Kind.COMPACT:
                         handleCompactStatement(metadataProvider, stmt, hcc);
@@ -1694,7 +1699,7 @@
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
                             loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1712,8 +1717,8 @@
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            IStatementExecutor.Stats stats, boolean compileOnly, String clientContextId, IStatementExecutorContext ctx)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId,
+            IStatementExecutorContext ctx) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         final IMetadataLocker locker = new IMetadataLocker() {
@@ -1755,7 +1760,8 @@
         }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
-            deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
+            deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
+                    clientContextId, ctx);
         } else {
             locker.lock();
             try {
@@ -1811,11 +1817,11 @@
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionOutput);
 
         // Query Compilation (happens under the same ongoing metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
-                rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
+                rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt);
     }
 
     private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -1824,7 +1830,7 @@
 
         // Insert/upsert statement rewriting (happens under the same ongoing metadata transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionOutput);
 
         InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first;
         String dataverseName = getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1846,7 +1852,7 @@
         }
         // Insert/upsert statement compilation (happens under the same ongoing metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
-                rewrittenResult.second, datasetName, sessionConfig, clfrqs);
+                rewrittenResult.second, datasetName, sessionOutput, clfrqs);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
@@ -2052,7 +2058,7 @@
                 datasets.add(ds);
             }
             org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections,
+                    FeedOperations.buildStartFeedJob(sessionOutput, metadataProvider, feed, feedConnections,
                             compilationProvider, storageComponentProvider, qtFactory, hcc);
 
             JobSpecification feedJob = jobInfo.getLeft();
@@ -2287,8 +2293,8 @@
     }
 
     protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+            IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
+            String clientContextId, IStatementExecutorContext ctx) throws Exception {
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
@@ -2318,12 +2324,14 @@
                 throw e;
             }
         };
-        deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
+        deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, clientContextId,
+                ctx);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
-            MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats,
-            String clientContextId, IStatementExecutorContext ctx) throws Exception {
+            MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
+            ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+            throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -2339,13 +2347,17 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
-                    ResultUtil.printResults(appCtx, resultReader, sessionConfig, stats,
+                    ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
-                    ResultUtil.printResultHandle(sessionConfig, new ResultHandle(id, resultSetId));
+                    ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
+                    if (outMetadata != null) {
+                        outMetadata.getResultSets()
+                                .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
+                    }
                 }, clientContextId, ctx);
                 break;
             default:
@@ -2360,8 +2372,8 @@
         try {
             createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
-                ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.RUNNING);
-                ResultUtil.printResultHandle(sessionConfig, handle);
+                ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
+                ResultUtil.printResultHandle(sessionOutput, handle);
                 synchronized (printed) {
                     printed.setTrue();
                     printed.notify();
@@ -2370,8 +2382,8 @@
         } catch (Exception e) {
             if (JobId.INVALID.equals(jobId.getValue())) {
                 // compilation failed
-                ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.FAILED);
-                ResultUtil.printError(sessionConfig.out(), e);
+                ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED);
+                ResultUtil.printError(sessionOutput.out(), e);
             } else {
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
                         resultDelivery.name() + " job with id " + jobId.getValue() + " " + "failed", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index bf7d5eb..57cc340 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -19,17 +19,19 @@
 
 package org.apache.asterix.hyracks.bootstrap;
 
+import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -73,6 +75,7 @@
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -95,6 +98,7 @@
     protected ICCServiceContext ccServiceCtx;
     protected CCExtensionManager ccExtensionManager;
     protected IStorageComponentProvider componentProvider;
+    protected StatementExecutorContext statementExecutorCtx;
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
@@ -124,9 +128,10 @@
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), componentProvider);
+        statementExecutorCtx = new StatementExecutorContext();
         appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager,
                 () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy,
-                new ActiveLifecycleListener());
+                new ActiveLifecycleListener(), componentProvider);
         ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
@@ -184,7 +189,7 @@
         IHyracksClientConnection hcc = getHcc();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx,
-                ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(),
+                ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP),
                 getStatementExecutorFactory(), componentProvider));
         return webServer;
     }
@@ -197,6 +202,8 @@
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
                 ccServiceCtx.getControllerService().getExecutor());
+        jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx);
+        jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx);
 
         // AQL rest APIs.
         addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -241,28 +248,28 @@
     protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
         switch (key) {
             case Servlets.AQL:
-                return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP:
-                return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.RUNNING_REQUESTS:
                 return new QueryCancellationServlet(ctx, paths);
@@ -271,8 +278,9 @@
             case Servlets.QUERY_RESULT:
                 return new QueryResultApiServlet(ctx, paths, appCtx);
             case Servlets.QUERY_SERVICE:
-                return new QueryServiceServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
-                        getStatementExecutorFactory(), componentProvider);
+                return new QueryServiceServlet(ctx, paths, appCtx, SQLPP,
+                        ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(),
+                        componentProvider);
             case Servlets.CONNECTOR:
                 return new ConnectorApiServlet(ctx, paths, appCtx);
             case Servlets.SHUTDOWN:
@@ -292,13 +300,17 @@
         }
     }
 
-    private IStatementExecutorFactory getStatementExecutorFactory() {
+    public IStatementExecutorFactory getStatementExecutorFactory() {
         return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
     }
 
+    public IStatementExecutorContext getStatementExecutorContext() {
+        return statementExecutorCtx;
+    }
+
     @Override
     public void startupCompleted() throws Exception {
-        ccServiceCtx.getControllerService().getExecutor().submit((Callable) () -> {
+        ccServiceCtx.getControllerService().getExecutor().submit(() -> {
             ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
             ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
             return null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 9c24acf..ec01e1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -49,21 +49,26 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.WebManager;
 
 public class NCApplication extends BaseNCApplication {
     private static final Logger LOGGER = Logger.getLogger(NCApplication.class.getName());
 
-    private INCServiceContext ncServiceCtx;
+    protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated = false;
     private SystemState systemState;
+    protected WebManager webManager;
 
     @Override
     public void registerConfig(IConfigManager configManager) {
@@ -122,6 +127,8 @@
             localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
+        webManager = new WebManager();
+
         performLocalCleanUp();
     }
 
@@ -131,6 +138,10 @@
         Logger.getLogger("org.apache.asterix").setLevel(level);
     }
 
+    protected void configureServers() throws Exception {
+        // override to start web services on NC nodes
+    }
+
     protected List<AsterixExtension> getExtensions() {
         return Collections.emptyList();
     }
@@ -143,6 +154,9 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Stopping Asterix node controller: " + nodeId);
             }
+
+            webManager.stop();
+
             //Clean any temporary files
             performLocalCleanUp();
 
@@ -163,6 +177,10 @@
 
     @Override
     public void startupCompleted() throws Exception {
+        // configure servlets after joining the cluster, so we can create HyracksClientConnection
+        configureServers();
+        webManager.start();
+
         // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
         final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
         if (systemState == SystemState.PERMANENT_DATA_LOSS
@@ -262,4 +280,10 @@
     public INcApplicationContext getApplicationContext() {
         return runtimeContext;
     }
+
+    protected IHyracksClientConnection getHcc() throws Exception {
+        NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService();
+        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+        return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 630aabe..33e89f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -30,12 +31,16 @@
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
+import io.netty.util.collection.LongObjectHashMap;
+import io.netty.util.collection.LongObjectMap;
+
 public class NCMessageBroker implements INCMessageBroker {
     private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 
@@ -44,6 +49,8 @@
     private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
     private final int maxMsgSize;
+    private final AtomicLong futureIdGenerator;
+    private final LongObjectMap<MessageFuture> futureMap;
 
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
         this.ncs = ncs;
@@ -53,6 +60,8 @@
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
                 messagingProperties.getFrameSize());
         receivedMsgsQ = new LinkedBlockingQueue<>();
+        futureIdGenerator = new AtomicLong();
+        futureMap = new LongObjectHashMap<>();
         MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
         appContext.getThreadExecutor().execute(msgDeliverySvc);
     }
@@ -104,6 +113,26 @@
         ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
     }
 
+    @Override
+    public MessageFuture registerMessageFuture() {
+        long futureId = futureIdGenerator.incrementAndGet();
+        MessageFuture future = new MessageFuture(futureId);
+        synchronized (futureMap) {
+            if (futureMap.containsKey(futureId)) {
+                throw new IllegalStateException();
+            }
+            futureMap.put(futureId, future);
+        }
+        return future;
+    }
+
+    @Override
+    public MessageFuture deregisterMessageFuture(long futureId) {
+        synchronized (futureMap) {
+            return futureMap.remove(futureId);
+        }
+    }
+
     private class MessageDeliveryService implements Runnable {
         /*
          * TODO Currently this thread is not stopped when it is interrupted because
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index bffaeef..d1ff871 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -69,7 +69,7 @@
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -151,7 +151,7 @@
         return spec;
     }
 
-    private static JobSpecification getConnectionJob(SessionConfig sessionConfig, MetadataProvider metadataProvider,
+    private static JobSpecification getConnectionJob(SessionOutput sessionOutput, MetadataProvider metadataProvider,
             FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider,
             IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException {
@@ -165,7 +165,7 @@
         statements.add(dataverseDecl);
         statements.add(subscribeStmt);
         IStatementExecutor translator = qtFactory.create(metadataProvider.getApplicationContext(), statements,
-                sessionConfig, compilationProvider, storageComponentProvider);
+                sessionOutput, compilationProvider, storageComponentProvider);
         // configure the metadata provider
         metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy());
@@ -357,7 +357,7 @@
     }
 
     public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
-            SessionConfig sessionConfig, MetadataProvider metadataProvider, Feed feed,
+            SessionOutput sessionOutput, MetadataProvider metadataProvider, Feed feed,
             List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider,
             IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws Exception {
@@ -372,7 +372,7 @@
         String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
-            JobSpecification connectionJob = getConnectionJob(sessionConfig, metadataProvider, feedConnection,
+            JobSpecification connectionJob = getConnectionJob(sessionOutput, metadataProvider, feedConnection,
                     ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc);
             jobsList.add(connectionJob);
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index e2885b3..5e3da5f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -39,7 +39,7 @@
 import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,7 +51,7 @@
     @Test
     public void test() throws Exception {
         List<Statement> statements = new ArrayList<>();
-        SessionConfig mockSessionConfig = mock(SessionConfig.class);
+        SessionOutput mockSessionOutput = mock(SessionOutput.class);
         RunStatement mockRunStatement = mock(RunStatement.class);
 
         // Mocks AppContextInfo.
@@ -70,7 +70,7 @@
         when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
 
         IStatementExecutor aqlTranslator = new DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo,
-                statements, mockSessionConfig, new AqlCompilationProvider(), new StorageComponentProvider());
+                statements, mockSessionOutput, new AqlCompilationProvider(), new StorageComponentProvider());
         List<String> parameters = new ArrayList<>();
         parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
         parameters.add("org.apache.pregelix.example.PageRankVertex");
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index f1e6aca..9913493 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,11 +32,13 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.Inet4Address;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -118,19 +120,26 @@
     /*
      * Instance members
      */
-    protected final String host;
-    protected final int port;
+    protected final List<InetSocketAddress> endpoints;
+    protected int endpointSelector;
     protected ITestLibrarian librarian;
 
-    public TestExecutor(String host, int port) {
-        this.host = host;
-        this.port = port;
-    }
-
     public TestExecutor() {
         this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
     }
 
+    public TestExecutor(String host, int port) {
+        this(InetSocketAddress.createUnresolved(host, port));
+    }
+
+    public TestExecutor(InetSocketAddress endpoint) {
+        this(Collections.singletonList(endpoint));
+    }
+
+    public TestExecutor(List<InetSocketAddress> endpoints) {
+        this.endpoints = endpoints;
+    }
+
     public void setLibrarian(ITestLibrarian librarian) {
         this.librarian = librarian;
     }
@@ -373,7 +382,7 @@
                     continue;
                 }
                 throw new Exception(
-                        "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result.");
+                        "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result: "+actual);
             }
         } catch (Exception e) {
             System.err.println("Actual results file: " + actualFile.toString());
@@ -811,6 +820,7 @@
                 break;
             case "mgx":
                 executeManagixCommand(stripLineComments(statement).trim());
+                Thread.sleep(8000);
                 break;
             case "txnqbc": // qbc represents query before crash
                 InputStream resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
@@ -1165,7 +1175,7 @@
 
     protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception {
         String[] split = endpoint.split("\\?");
-        URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null);
+        URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : null);
         return executeURI(ctxType, uri, fmt);
     }
 
@@ -1184,7 +1194,7 @@
         //get node process id
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null));
+        InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1201,7 +1211,7 @@
     private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception {
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint));
+        InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1280,8 +1290,16 @@
                         + cUnit.getName() + "_qbc.adm");
     }
 
+    protected URI createEndpointURI(String path, String query) throws URISyntaxException {
+        int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+        InetSocketAddress endpoint = endpoints.get(endpointIdx);
+        URI uri = new URI("http", null, endpoint.getHostString(), endpoint.getPort(), path, query, null);
+        LOGGER.fine("Created endpoint URI: " + uri);
+        return uri;
+    }
+
     protected URI getEndpoint(String servlet) throws URISyntaxException {
-        return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null);
+        return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
     }
 
     public static String stripJavaComments(String text) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
new file mode 100644
index 0000000..9a2ad3c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -0,0 +1,52 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-group name="async-deferred">
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-failed">
+            <output-dir compare="Text">async-failed</output-dir>
+            <expected-error>Injected failure in asterix:inject-failure</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-compilation-failed">
+            <output-dir compare="Text">async-compilation-failed</output-dir>
+            <expected-error>Cannot find dataset gargel</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="deferred">
+            <output-dir compare="Text">deferred</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async">
+            <output-dir compare="Text">async</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-repeated">
+            <output-dir compare="Text">async-repeated</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-running">
+            <output-dir compare="Text">async-running</output-dir>
+        </compilation-unit>
+    </test-case>
+</test-group>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8f7ffc3..d5716d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -18,43 +18,10 @@
  !-->
 <!DOCTYPE test-suite [
   <!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml">
-
+  <!ENTITY AsyncDeferredQueries SYSTEM "queries_sqlpp/async-deferred/AsyncDeferredQueries.xml">
 ]>
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
-  <test-group name="async-deferred">
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-failed">
-        <output-dir compare="Text">async-failed</output-dir>
-        <expected-error>Injected failure in asterix:inject-failure</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-compilation-failed">
-        <output-dir compare="Text">async-compilation-failed</output-dir>
-        <expected-error>Cannot find dataset gargel</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="deferred">
-        <output-dir compare="Text">deferred</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async">
-        <output-dir compare="Text">async</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-repeated">
-        <output-dir compare="Text">async-repeated</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-running">
-        <output-dir compare="Text">async-running</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
+  &AsyncDeferredQueries;
   <test-group name="flwor">
     <test-case FilePath="flwor">
       <compilation-unit name="at00">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index a9e6448..e5b78cf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -72,5 +73,24 @@
      */
     public IHyracksClientConnection getHcc();
 
+    /**
+     * Returns the resource manager
+     *
+     * @return {@link IResourceIdManager} implementation instance
+     */
     public IResourceIdManager getResourceIdManager();
+
+    /**
+     * Returns the storage component provider
+     *
+     * @return {@link IStorageComponentProvider} implementation instance
+     */
+    public IStorageComponentProvider getStorageComponentProvider();
+
+    /**
+     * Returns the extension manager
+     *
+     * @return the extension manager instance
+     */
+    public Object getExtensionManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index e1101b3..86d1074 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -26,7 +26,6 @@
      * Sends application message from this NC to the CC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
@@ -35,7 +34,6 @@
      * Sends application message from this NC to another NC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToNC(String nodeId, INcAddressedMessage message)
@@ -47,4 +45,17 @@
      * @param msg
      */
     public void queueReceivedMessage(INcAddressedMessage msg);
+
+    /**
+     * Creates and registers a Future for a message that will be send through this broker
+     * @return new Future
+     */
+    MessageFuture registerMessageFuture();
+
+    /**
+     * Removes a previously registered Future
+     * @param futureId future identifier
+     * @return existing Future or {@code null} if there was no Future associated with this identifier
+     */
+    MessageFuture deregisterMessageFuture(long futureId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
new file mode 100644
index 0000000..f4ed1ff
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CompletableFuture} associated with an identifier
+ */
+public class MessageFuture extends CompletableFuture<IMessage> {
+
+    private final long futureId;
+
+    public MessageFuture(long futureId) {
+        this.futureId = futureId;
+    }
+
+    public long getFutureId() {
+        return futureId;
+    }
+}
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index d9ca295..e341fef 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -281,6 +281,10 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 3c65a83..814e109 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -88,6 +88,7 @@
             AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
             String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
             cmdHandler.processCommand(command.split(" "));
+            Thread.sleep(4000);
             AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
                     .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
             AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 8608d68..a4c271c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -54,6 +55,7 @@
 public class CcApplicationContext implements ICcApplicationContext {
 
     private ICCServiceContext ccServiceCtx;
+    private IStorageComponentProvider storageComponentProvider;
     private IGlobalRecoveryManager globalRecoveryManager;
     private ILibraryManager libraryManager;
     private IResourceIdManager resourceIdManager;
@@ -77,7 +79,8 @@
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, IResourceIdManager resourceIdManager,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
-            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener)
+            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener,
+            IStorageComponentProvider storageComponentProvider)
             throws AsterixException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
@@ -102,6 +105,7 @@
         this.nodeProperties = new NodeProperties(propertiesAccessor);
         this.metadataBootstrapSupplier = metadataBootstrapSupplier;
         this.globalRecoveryManager = globalRecoveryManager;
+        this.storageComponentProvider = storageComponentProvider;
     }
 
     @Override
@@ -174,6 +178,7 @@
         return libraryManager;
     }
 
+    @Override
     public Object getExtensionManager() {
         return extensionManager;
     }
@@ -213,4 +218,9 @@
     public IJobLifecycleListener getActiveLifecycleListener() {
         return activeLifeCycleListener;
     }
+
+    @Override
+    public IStorageComponentProvider getStorageComponentProvider() {
+        return storageComponentProvider;
+    }
 }
diff --git a/asterixdb/asterix-server/src/main/assembly/filter.properties b/asterixdb/asterix-server/src/main/assembly/filter.properties
index b01047f..69c5b02 100644
--- a/asterixdb/asterix-server/src/main/assembly/filter.properties
+++ b/asterixdb/asterix-server/src/main/assembly/filter.properties
@@ -21,4 +21,5 @@
 NC_COMMAND=asterixnc
 HELPER_COMMAND=asterixhelper
 LISTEN_PORT=19002
-PRODUCT=AsterixDB
\ No newline at end of file
+PRODUCT=AsterixDB
+NC_BLUE_EXTRA=
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
index d1f03bc..184728d 100644
--- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
+++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
@@ -25,6 +25,7 @@
 txn.log.dir=data/blue/txnlog
 core.dump.dir=data/blue/coredump
 iodevices=data/blue
+${NC_BLUE_EXTRA}
 
 [nc]
 storage.subdir=storage
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index f34dc6a..ebd945e 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -20,14 +20,29 @@
 
 import java.io.File;
 
+import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.transform.sax.SAXSource;
+
+import org.xml.sax.InputSource;
 
 public class TestSuiteParser {
     public TestSuiteParser() {
     }
 
     public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception {
+        SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+        saxParserFactory.setNamespaceAware(true);
+        saxParserFactory.setXIncludeAware(true);
+        SAXParser saxParser = saxParserFactory.newSAXParser();
+        saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
+
         JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
-        return (org.apache.asterix.testframework.xml.TestSuite) ctx.createUnmarshaller().unmarshal(testSuiteCatalog);
+        Unmarshaller um = ctx.createUnmarshaller();
+        return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(new SAXSource(saxParser.getXMLReader(),
+                new InputSource(testSuiteCatalog.toURI().toString())));
     }
 }
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 0fed1bb..b06c4b5 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -212,7 +212,6 @@
             <usedDependency>org.apache.httpcomponents:httpclient</usedDependency>
             <usedDependency>org.apache.httpcomponents:httpcore</usedDependency>
             <usedDependency>org.slf4j:slf4j-simple</usedDependency>
-            <usedDependency>xerces:xercesImpl</usedDependency>
           </usedDependencies>
           <ignoredUnusedDeclaredDependencies>
             <ignoredUnusedDeclaredDependency>org.apache.asterix:asterix-external-data:zip:*</ignoredUnusedDeclaredDependency>
@@ -388,11 +387,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>xerces</groupId>
-      <artifactId>xercesImpl</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index cdfc492..ca20f4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -47,6 +47,8 @@
     // Constants
     private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
     private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
+    protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
+            new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK);
     private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -192,8 +194,7 @@
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                        new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK))
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
     }
@@ -225,7 +226,6 @@
                 }
             }
         }
-        LOGGER.warning("No servlet for " + uri);
         return null;
     }
 
@@ -254,7 +254,15 @@
         return b && (path.length() == cpl || '/' == path.charAt(cpl));
     }
 
+    protected HttpServerHandler createHttpHandler(int chunkSize) {
+        return new HttpServerHandler<>(this, chunkSize);
+    }
+
     public ExecutorService getExecutor() {
         return executor;
     }
+
+    protected EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 6f7ccba..00b3cb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -33,15 +33,16 @@
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
 
-public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> {
 
     private static final Logger LOGGER = Logger.getLogger(HttpServerHandler.class.getName());
-    protected final HttpServer server;
+    protected final T server;
     protected final int chunkSize;
     protected HttpRequestHandler handler;
 
-    public HttpServerHandler(HttpServer server, int chunkSize) {
+    public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
         this.chunkSize = chunkSize;
     }
@@ -65,18 +66,18 @@
         try {
             IServlet servlet = server.getServlet(request);
             if (servlet == null) {
-                respond(ctx, request, HttpResponseStatus.NOT_FOUND);
+                handleServletNotFound(ctx, request);
             } else {
                 submit(ctx, servlet, request);
             }
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e);
-            respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            respond(ctx, request.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
         }
     }
 
-    private void respond(ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
-        DefaultHttpResponse response = new DefaultHttpResponse(request.protocolVersion(), status);
+    protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) {
+        DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status);
         ctx.write(response).addListener(ChannelFutureListener.CLOSE);
     }
 
@@ -86,7 +87,7 @@
             servletRequest = HttpUtil.toServletRequest(request);
         } catch (IllegalArgumentException e) {
             LOGGER.log(Level.WARNING, "Failure Decoding Request", e);
-            respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+            respond(ctx, request.protocolVersion(), HttpResponseStatus.BAD_REQUEST);
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
@@ -102,6 +103,13 @@
         }
     }
 
+    protected void handleServletNotFound(ChannelHandlerContext ctx, FullHttpRequest request) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("No servlet for " + request.uri());
+        }
+        respond(ctx, request.protocolVersion(), HttpResponseStatus.NOT_FOUND);
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index bc67865..4f8655f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -44,6 +44,6 @@
                 MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());
         p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
-        p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
+        p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE));
     }
 }