[NO ISSUE][API] Return Async Query Results As JSON

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details:
- Add ResultMetadata that can hold metadata about a job's
  result.
- Add required CC functions to get ResultMetadata.
- Add IJobResultCallback that is called when a job's result
  is written successfully.
- Calculate a job's processed objects on the job completion.
- Calculate a job's duration on the job completion.
- Use the async query request format when serving the result
  from QueryResultApiServlet.
- Return a proper JSON envelop in QueryResultApiServlet
  which includes the results as well as metrics fields.
- Add test case for returning JSON response for async
  query.

Change-Id: Ic4812a14925099a677f9e77a0040f881d2600724
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3401
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
index 8e0214a..fea9340 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
@@ -22,6 +22,7 @@
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.api.result.IResultMetadata;
 
 /**
  * The interface is in charge of translating language expressions into logical query plans.
@@ -37,11 +38,13 @@
      *            the output dataset name (only for insert/delete).
      * @param stmt,
      *            the compiled dml statement (only for insert/delete).
+     * @param resultMetadata,
+     *            some result metadata that can be retrieved with the result
      * @return a logical query plan for the query.
      * @throws AlgebricksException
      */
-    public ILogicalPlan translate(Query query, String outputDatasetName, ICompiledDmlStatement stmt)
-            throws AlgebricksException;
+    public ILogicalPlan translate(Query query, String outputDatasetName, ICompiledDmlStatement stmt,
+            IResultMetadata resultMetadata) throws AlgebricksException;
 
     /**
      * Translates a load statement.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index b7e9701..b8bfffd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -156,6 +156,7 @@
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
+import org.apache.hyracks.api.result.IResultMetadata;
 
 /**
  * Each visit returns a pair of an operator and a variable. The variable
@@ -299,13 +300,13 @@
     }
 
     @Override
-    public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
-            throws AlgebricksException {
-        return translate(expr, outputDatasetName, stmt, null);
+    public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
+            IResultMetadata resultMetadata) throws AlgebricksException {
+        return translate(expr, outputDatasetName, stmt, null, resultMetadata);
     }
 
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
-            ILogicalOperator baseOp) throws AlgebricksException {
+            ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
         if (baseOp != null) {
             base = new MutableObject<>(baseOp);
@@ -332,7 +333,7 @@
             writeExprList.add(new MutableObject<>(resVarRef));
             ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
             ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
-            DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink);
+            DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink, resultMetadata);
             newTop.setSourceLocation(sourceLoc);
             newTop.getInputs().add(new MutableObject<>(topOp));
             topOp = newTop;
@@ -421,12 +422,12 @@
             switch (stmt.getKind()) {
                 case INSERT:
                     leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
-                            additionalFilteringExpressions, assign, stmt);
+                            additionalFilteringExpressions, assign, stmt, resultMetadata);
                     break;
                 case UPSERT:
                     leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
                             additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, topOp, exprs,
-                            resVar, additionalFilteringAssign, stmt);
+                            resVar, additionalFilteringAssign, stmt, resultMetadata);
                     break;
                 case DELETE:
                     leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
@@ -470,7 +471,7 @@
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
             List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
-            ICompiledDmlStatement stmt) throws AlgebricksException {
+            ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (!targetDatasource.getDataset().allow(topOp, DatasetUtil.OP_UPSERT)) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -577,13 +578,13 @@
         rootOperator = delegateOperator;
 
         // Compiles the return expression.
-        return processReturningExpression(rootOperator, upsertOp, compiledUpsert);
+        return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
     }
 
     private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
-            ICompiledDmlStatement stmt) throws AlgebricksException {
+            ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (targetDatasource.getDataset().hasMetaPart()) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -605,13 +606,14 @@
         rootOperator.setSourceLocation(sourceLoc);
 
         // Compiles the return expression.
-        return processReturningExpression(rootOperator, insertOp, compiledInsert);
+        return processReturningExpression(rootOperator, insertOp, compiledInsert, resultMetadata);
     }
 
     // Stitches the translated operators for the returning expression into the query
     // plan.
     private ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
-            InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert) throws AlgebricksException {
+            InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert, IResultMetadata resultMetadata)
+            throws AlgebricksException {
         Expression returnExpression = compiledInsert.getReturnExpression();
         if (returnExpression == null) {
             return inputOperator;
@@ -644,7 +646,7 @@
         expressions.add(new MutableObject<>(new VariableReferenceExpression(resultVar)));
         ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
         ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
-        DistributeResultOperator distResultOperator = new DistributeResultOperator(expressions, sink);
+        DistributeResultOperator distResultOperator = new DistributeResultOperator(expressions, sink, resultMetadata);
         distResultOperator.getInputs().add(new MutableObject<>(createResultAssignOperator));
 
         distResultOperator.setSourceLocation(sourceLoc);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 81151d0..d558e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -212,8 +212,9 @@
         metadataProvider.setTxnId(txnId);
         ILangExpressionToPlanTranslator t =
                 translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter, externalVars);
-
-        ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement);
+        ResultMetadata resultMetadata = new ResultMetadata(output.config().fmt());
+        ILogicalPlan plan =
+                isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement, resultMetadata);
 
         if ((isQuery || isLoad) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
                 && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
new file mode 100644
index 0000000..69f46a4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.common;
+
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.result.IResultMetadata;
+
+public class ResultMetadata implements IResultMetadata {
+
+    private final SessionConfig.OutputFormat format;
+    private long jobDuration;
+    private long processedObjects;
+
+    public ResultMetadata(SessionConfig.OutputFormat format) {
+        this.format = format;
+    }
+
+    public SessionConfig.OutputFormat getFormat() {
+        return format;
+    }
+
+    public long getProcessedObjects() {
+        return processedObjects;
+    }
+
+    public void setProcessedObjects(long processedObjects) {
+        this.processedObjects = processedObjects;
+    }
+
+    public void setJobDuration(long jobDuration) {
+        this.jobDuration = jobDuration;
+    }
+
+    public long getJobDuration() {
+        return jobDuration;
+    }
+
+    @Override
+    public String toString() {
+        return "ResultMetadata{" + "format=" + format + ", jobDuration=" + jobDuration + ", processedObjects="
+                + processedObjects + '}';
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 7f74c92..cda4d34 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.asterix.api.common.ResultMetadata;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
@@ -49,18 +50,16 @@
 
     @Override
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
+        long elapsedStart = System.nanoTime();
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, request);
-
         final String strHandle = localPath(request);
         final ResultHandle handle = ResultHandle.parse(strHandle);
         if (handle == null) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-
         IResultSet resultSet = getResultSet();
         ResultReader resultReader = new ResultReader(resultSet, handle.getJobId(), handle.getResultSetId());
-
         try {
             ResultJobRecord.Status status = resultReader.getStatus();
 
@@ -86,15 +85,20 @@
             if (httpStatus != HttpResponseStatus.OK) {
                 return;
             }
-
-            // QQQ The output format is determined by the initial
-            // query and cannot be modified here, so calling back to
-            // initResponse() is really an error. We need to find a
-            // way to send the same OutputFormat value here as was
-            // originally determined there. Need to save this value on
-            // some object that we can obtain here.
-            SessionOutput sessionOutput = initResponse(request, response);
-            ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
+            ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata();
+            SessionOutput sessionOutput = initResponse(request, response, metadata.getFormat());
+            if (metadata.getFormat() == SessionConfig.OutputFormat.CLEAN_JSON
+                    || metadata.getFormat() == SessionConfig.OutputFormat.LOSSLESS_JSON) {
+                final Stats stats = new Stats();
+                sessionOutput.out().print("{\n");
+                ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, null);
+                QueryServiceServlet.printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart,
+                        metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
+                        0, HttpUtil.getPreferredCharset(request));
+                sessionOutput.out().print("}\n");
+            } else {
+                ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
+            }
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
             if (ErrorCode.NO_RESULT_SET == errorCode) {
@@ -119,37 +123,15 @@
      * SessionConfig with the appropriate output writer and output-format
      * based on the Accept: header and other servlet parameters.
      */
-    static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException {
-        // CLEAN_JSON output is the default; most generally useful for a
-        // programmatic HTTP API
-        SessionConfig.OutputFormat format = SessionConfig.OutputFormat.CLEAN_JSON;
-        // First check the "output" servlet parameter.
-        String output = request.getParameter("output");
-        String accept = request.getHeader("Accept", "");
-        if (output != null) {
-            if ("CSV".equals(output)) {
-                format = SessionConfig.OutputFormat.CSV;
-            } else if ("ADM".equals(output)) {
-                format = SessionConfig.OutputFormat.ADM;
-            }
-        } else {
-            // Second check the Accept: HTTP header.
-            if (accept.contains("application/x-adm")) {
-                format = SessionConfig.OutputFormat.ADM;
-            } else if (accept.contains("text/csv")) {
-                format = SessionConfig.OutputFormat.CSV;
-            }
+    static SessionOutput initResponse(IServletRequest request, IServletResponse response,
+            SessionConfig.OutputFormat format) throws IOException {
+        String accept = request.getHeader("Accept");
+        if (accept == null) {
+            accept = "";
         }
         SessionConfig.PlanFormat planFormat = SessionConfig.PlanFormat.get(request.getParameter("plan-format"),
                 "plan format", SessionConfig.PlanFormat.STRING, LOGGER);
 
-        // If it's JSON, check for the "lossless" flag
-
-        if (format == SessionConfig.OutputFormat.CLEAN_JSON
-                && ("true".equals(request.getParameter("lossless")) || accept.contains("lossless=true"))) {
-            format = SessionConfig.OutputFormat.LOSSLESS_JSON;
-        }
-
         SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
                 .append("\":" + " \"").append(handle).append("\" }");
         SessionConfig sessionConfig = new SessionConfig(format, planFormat);
@@ -168,6 +150,8 @@
         }
         sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, wrapperArray);
         // Now that format is set, output the content-type
+        SessionOutput.ResultDecorator resultPrefix = null;
+        SessionOutput.ResultDecorator resultPostfix = null;
         switch (format) {
             case ADM:
                 HttpUtil.setContentType(response, "application/x-adm", request);
@@ -176,6 +160,8 @@
                 // No need to reflect "clean-ness" in output type; fall through
             case LOSSLESS_JSON:
                 HttpUtil.setContentType(response, "application/json", request);
+                resultPrefix = ResultUtil.createPreResultDecorator();
+                resultPostfix = ResultUtil.createPostResultDecorator();
                 break;
             case CSV:
                 // Check for header parameter or in Accept:.
@@ -189,7 +175,7 @@
             default:
                 throw new IOException("Unknown format " + format);
         }
-        return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null);
+        return new SessionOutput(sessionConfig, response.writer(), resultPrefix, resultPostfix, appendHandle, null);
     }
 
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 26e7430..06b75e3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -322,7 +322,7 @@
         }
     }
 
-    private static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
+    public static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
             long resultSize, long processedObjects, long errorCount, long warnCount, Charset resultCharset) {
         boolean hasErrors = errorCount != 0;
         boolean hasWarnings = warnCount != 0;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
new file mode 100644
index 0000000..30814c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import java.util.Collection;
+
+import org.apache.asterix.api.common.ResultMetadata;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IJobResultCallback;
+import org.apache.hyracks.api.result.ResultJobRecord;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
+import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class JobResultCallback implements IJobResultCallback {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICcApplicationContext appCtx;
+
+    public JobResultCallback(ICcApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    public void completed(JobId jobId, ResultJobRecord resultJobRecord) {
+        try {
+            updateResultMetadata(jobId, resultJobRecord);
+        } catch (Exception e) {
+            LOGGER.error("failed to update result metadata for {}", jobId, e);
+        }
+    }
+
+    private void updateResultMetadata(JobId jobId, ResultJobRecord resultJobRecord) {
+        final ResultMetadata metadata = (ResultMetadata) resultJobRecord.getResultSetMetaData().getMetadata();
+        metadata.setJobDuration(resultJobRecord.getJobDuration());
+        metadata.setProcessedObjects(getJobProccssedObjects(jobId));
+    }
+
+    private long getJobProccssedObjects(JobId jobId) {
+        long processedObjects = 0;
+        IJobManager jobManager =
+                ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+        final JobRun run = jobManager.get(jobId);
+        if (run != null) {
+            final JobProfile jobProfile = run.getJobProfile();
+            final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
+            for (JobletProfile jp : jobletProfiles) {
+                final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
+                for (TaskProfile tp : jobletTasksProfile) {
+                    processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+                }
+            }
+        }
+        return processedObjects;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index 09bb8a3..1acae87 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.IResultSetReader;
 import org.apache.hyracks.api.result.ResultJobRecord.Status;
@@ -52,4 +53,8 @@
     public IFrameTupleAccessor getFrameTupleAccessor() {
         return frameTupleAccessor;
     }
+
+    public IResultMetadata getMetadata() {
+        return reader.getResultMetadata();
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0c19303..8c0cc98 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -23,7 +23,6 @@
 import java.io.InputStream;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
@@ -195,16 +194,10 @@
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.IJobManager;
-import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
-import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
-import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -2512,7 +2505,7 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
-                    updateJobStats(id, stats);
+                    updateJobStats(id, stats, metadataProvider.getResultSetId());
                     // stop buffering and allow for streaming result delivery
                     sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
@@ -2521,7 +2514,7 @@
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
-                    updateJobStats(id, stats);
+                    updateJobStats(id, stats, metadataProvider.getResultSetId());
                     ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
                     if (outMetadata != null) {
                         outMetadata.getResultSets()
@@ -2534,23 +2527,13 @@
         }
     }
 
-    private void updateJobStats(JobId jobId, Stats stats) {
-        final IJobManager jobManager =
-                ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
-        final JobRun run = jobManager.get(jobId);
-        if (run == null || run.getStatus() != JobStatus.TERMINATED) {
-            return;
-        }
-        final JobProfile jobProfile = run.getJobProfile();
-        final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
-        long processedObjects = 0;
-        for (JobletProfile jp : jobletProfiles) {
-            final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
-            for (TaskProfile tp : jobletTasksProfile) {
-                processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
-            }
-        }
-        stats.setProcessedObjects(processedObjects);
+    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+        final ClusterControllerService controllerService =
+                (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        org.apache.asterix.api.common.ResultMetadata resultMetadata =
+                (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
+                        .getResultMetadata(jobId, rsId);
+        stats.setProcessedObjects(resultMetadata.getProcessedObjects());
     }
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 99286a2..5998599 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -53,6 +53,7 @@
 import org.apache.asterix.app.config.ConfigValidator;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
+import org.apache.asterix.app.result.JobResultCallback;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.INodeJobTracker;
@@ -91,6 +92,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -356,4 +358,9 @@
     public IGatekeeper getGatekeeper() {
         return getConfigManager().getAppConfig().getNCNames()::contains;
     }
+
+    @Override
+    public IJobResultCallback getJobResultCallback() {
+        return new JobResultCallback(appCtx);
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b8beccc..8969f18 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -95,7 +95,7 @@
         exchange.setPhysicalOperator(new OneToOneExchangePOperator());
         exchange.getInputs().add(new MutableObject<>(assignOperator));
 
-        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
         resultOperator.setExecutionMode(UNPARTITIONED);
         resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(exchange));
@@ -137,7 +137,7 @@
         orderOperator.setPhysicalOperator(new StableSortPOperator());
         orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
 
-        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
         resultOperator.setExecutionMode(PARTITIONED);
         resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(orderOperator));
@@ -217,7 +217,7 @@
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
 
-        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
         resultOperator.setExecutionMode(PARTITIONED);
         resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
@@ -277,7 +277,7 @@
         secondJoin.getInputs().add(new MutableObject<>(order1));
         secondJoin.getInputs().add(new MutableObject<>(order2));
 
-        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+        DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
         resultOperator.setExecutionMode(PARTITIONED);
         resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index ab8f172..cbe41e8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -145,6 +145,7 @@
     private static final Pattern MAX_RESULT_READS_PATTERN =
             Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
     private static final Pattern HTTP_REQUEST_TYPE = Pattern.compile("requesttype=(.*)", Pattern.MULTILINE);
+    private static final Pattern EXTRACT_RESULT_TYPE = Pattern.compile("extractresult=(.*)", Pattern.MULTILINE);
     private static final String NC_ENDPOINT_PREFIX = "nc:";
     public static final int TRUNCATE_THRESHOLD = 16384;
     public static final Set<String> NON_CANCELLABLE =
@@ -1224,11 +1225,15 @@
         final List<Parameter> params = extractParameters(statement);
         final Optional<String> body = extractBody(statement);
         final Predicate<Integer> statusCodePredicate = extractStatusCodePredicate(statement);
+        final boolean extracResult = isExtracResult(statement);
         InputStream resultStream;
         if ("http".equals(extension)) {
             resultStream = executeHttp(reqType, variablesReplaced, fmt, params, statusCodePredicate, body);
         } else if ("uri".equals(extension)) {
             resultStream = executeURI(reqType, URI.create(variablesReplaced), fmt, params, statusCodePredicate, body);
+            if (extracResult) {
+                resultStream = ResultExtractor.extract(resultStream, UTF_8);
+            }
         } else {
             throw new IllegalArgumentException("Unexpected format for method " + reqType + ": " + extension);
         }
@@ -1574,6 +1579,11 @@
         return m.find() ? m.group(1) : null;
     }
 
+    private static boolean isExtracResult(String statement) {
+        Matcher m = EXTRACT_RESULT_TYPE.matcher(statement);
+        return m.find() ? Boolean.valueOf(m.group(1)) : false;
+    }
+
     private static boolean isJsonEncoded(String httpRequestType) throws Exception {
         if (httpRequestType == null || httpRequestType.isEmpty()) {
             return true;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 5324a0d..ff75720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -56,4 +56,9 @@
             <source-location>false</source-location>
         </compilation-unit>
     </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-json">
+            <output-dir compare="Clean-JSON">async-json</output-dir>
+        </compilation-unit>
+    </test-case>
 </test-group>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp
new file mode 100644
index 0000000..e24253a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- handlevariable=status
+
+select i, i * i as i2 from range(1, 5) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri
new file mode 100644
index 0000000..d10aed9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- polltimeoutsecs=10
+-- handlevariable=result
+
+$status
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri
new file mode 100644
index 0000000..6496a4b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- extractresult=true
+$result
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex
new file mode 100644
index 0000000..4308ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex
@@ -0,0 +1,2 @@
+/"status": "success"/
+/"handle": ".*"/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json
new file mode 100644
index 0000000..d4e1a2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json
@@ -0,0 +1 @@
+{"i":1,"i2":1}{"i":2,"i2":4}{"i":3,"i2":9}{"i":4,"i2":16}{"i":5,"i2":25}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 418ef4b..0a72ceb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -131,6 +131,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
@@ -597,8 +598,8 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
-            JobSpecification spec) throws AlgebricksException {
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+            IResultMetadata metadata, JobSpecification spec) throws AlgebricksException {
         ResultSetDataSink rsds = (ResultSetDataSink) sink;
         ResultSetSinkId rssId = rsds.getId();
         ResultSetId rsId = rssId.getResultSetId();
@@ -606,7 +607,7 @@
         try {
             IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
                     .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
-            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(),
                     resultSerializedAppenderFactory, getMaxResultReads());
         } catch (IOException e) {
             throw new AlgebricksException(e);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 3d004a2..6d001dc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 public interface IMetadataProvider<S, I> {
@@ -58,8 +59,8 @@
             throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
-            JobSpecification spec) throws AlgebricksException;
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+            IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
index c40e035..22ee1c6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
@@ -32,14 +32,18 @@
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.api.result.IResultMetadata;
 
 public class DistributeResultOperator extends AbstractLogicalOperator {
     private List<Mutable<ILogicalExpression>> expressions;
     private IDataSink dataSink;
+    private IResultMetadata resultMetadata;
 
-    public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
+    public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink,
+            IResultMetadata resultMetadata) {
         this.expressions = expressions;
         this.dataSink = dataSink;
+        this.resultMetadata = resultMetadata;
     }
 
     public List<Mutable<ILogicalExpression>> getExpressions() {
@@ -93,4 +97,7 @@
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    public IResultMetadata getResultMetadata() {
+        return resultMetadata;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 1727d10..fd4009f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -277,7 +277,7 @@
             throws AlgebricksException {
         ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
-        return new DistributeResultOperator(newExpressions, op.getDataSink());
+        return new DistributeResultOperator(newExpressions, op.getDataSink(), op.getResultMetadata());
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index a309d9e..138cff8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -102,8 +102,8 @@
         IPrinterFactory[] pf =
                 JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
 
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
-                mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+                resultOp.getDataSink(), columns, pf, inputDesc, resultOp.getResultMetadata(), spec);
         IOperatorDescriptor opDesc = runtimeAndConstraints.first;
         opDesc.setSourceLocation(resultOp.getSourceLocation());
         builder.contributeHyracksOperator(resultOp, opDesc);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index aa1021b..5cc8f69 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.result.IJobResultCallback;
 
 public interface ICCApplication extends IApplication {
 
@@ -30,4 +31,10 @@
 
     IGatekeeper getGatekeeper();
 
+    /**
+     * Gets the job result callback
+     *
+     * @return the job result callback
+     */
+    IJobResultCallback getJobResultCallback();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 36ce18f..0ee3658 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -46,6 +46,7 @@
         GET_RESULT_DIRECTORY_ADDRESS,
         GET_RESULT_STATUS,
         GET_RESULT_LOCATIONS,
+        GET_RESULT_METADATA,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO,
         CLI_DEPLOY_BINARY,
@@ -318,6 +319,32 @@
         }
     }
 
+    public static class GetResultMetadataFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        public GetResultMetadataFunction(JobId jobId, ResultSetId rsId) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_RESULT_METADATA;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+    }
+
     public static class WaitForCompletionFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
new file mode 100644
index 0000000..2d5fea1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.job.JobId;
+
+public interface IJobResultCallback {
+
+    /**
+     * Notifies this callback that writing the result of job {@code jobId} has been completed successfully.
+     *
+     * @param jobId
+     * @param resultJobRecord
+     */
+    void completed(JobId jobId, ResultJobRecord resultJobRecord);
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
index 65c8d09..4798494 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -48,4 +48,13 @@
      */
     ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords)
             throws Exception;
+
+    /**
+     * Gets the result metadata
+     * @param jobId
+     * @param rsId
+     * @return
+     * @throws Exception
+     */
+    IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java
new file mode 100644
index 0000000..12696af
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.io.Serializable;
+
+public interface IResultMetadata extends Serializable {
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
index a539d37..4909903 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -24,11 +24,11 @@
 import org.apache.hyracks.api.job.JobId;
 
 public interface IResultPartitionManager extends IResultManager {
-    IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+    IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, IResultMetadata metadata,
             boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException;
 
     void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-            boolean orderedResult, boolean emptyResult) throws HyracksException;
+            IResultMetadata metadata, boolean emptyResult) throws HyracksException;
 
     void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
index 1669c36..1652dd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
@@ -26,4 +26,11 @@
     Status getResultStatus();
 
     int read(IFrame frame) throws HyracksDataException;
+
+    /**
+     * Gets the result metadata
+     *
+     * @return the result metadata
+     */
+    IResultMetadata getResultMetadata();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index b8ddbd2..b3b0706 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResultJobRecord implements IResultStateRecord {
+
     public enum State {
         IDLE,
         RUNNING,
@@ -78,9 +79,12 @@
     private static final long serialVersionUID = 1L;
 
     private final long timestamp;
-
+    private long jobStartTime;
+    private long jobEndTime;
     private Status status;
 
+    private ResultSetMetaData resultSetMetaData;
+
     private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
 
     public ResultJobRecord() {
@@ -96,9 +100,18 @@
     }
 
     public void start() {
+        jobStartTime = System.nanoTime();
         updateState(State.RUNNING);
     }
 
+    public void finish() {
+        jobEndTime = System.nanoTime();
+    }
+
+    public long getJobDuration() {
+        return jobEndTime - jobStartTime;
+    }
+
     public void success() {
         updateState(State.SUCCESS);
     }
@@ -130,12 +143,14 @@
         return sb.toString();
     }
 
-    public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions)
+    public synchronized void setResultSetMetaData(ResultSetId rsId, IResultMetadata metadata, int nPartitions)
             throws HyracksDataException {
         ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
         if (rsMd == null) {
-            resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult));
-        } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) {
+            final ResultSetMetaData resultSetMetaData = new ResultSetMetaData(nPartitions, metadata);
+            resultSetMetadataMap.put(rsId, resultSetMetaData);
+            this.resultSetMetaData = resultSetMetaData;
+        } else if (rsMd.getRecords().length != nPartitions) {
             throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
         }
         //TODO(tillw) throwing a HyracksDataException here hangs the execution tests
@@ -174,4 +189,8 @@
             success();
         }
     }
+
+    public synchronized ResultSetMetaData getResultSetMetaData() {
+        return resultSetMetaData;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
index b7b8f1c..60c76c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
@@ -22,15 +22,15 @@
 
 public class ResultSetMetaData {
     private final ResultDirectoryRecord[] records;
-    private final boolean ordered;
+    private final IResultMetadata metadata;
 
-    ResultSetMetaData(int len, boolean ordered) {
+    ResultSetMetaData(int len, IResultMetadata metadata) {
         this.records = new ResultDirectoryRecord[len];
-        this.ordered = ordered;
+        this.metadata = metadata;
     }
 
-    public boolean getOrderedResult() {
-        return ordered;
+    public IResultMetadata getMetadata() {
+        return metadata;
     }
 
     public ResultDirectoryRecord[] getRecords() {
@@ -40,7 +40,7 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records));
+        sb.append("{metadata: ").append(metadata).append(", records: ").append(Arrays.toString(records));
         return sb.toString();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index d7f79fd..b335a93 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord.Status;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -58,4 +59,9 @@
             ResultDirectoryRecord[] knownRecords) throws Exception {
         return remoteResultDirectory.getResultLocations(jobId, rsId, knownRecords);
     }
+
+    @Override
+    public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception {
+        return remoteResultDirectory.getResultMetadata(jobId, rsId);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index a1adda3..ed628f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord.Status;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -52,4 +53,11 @@
                 new HyracksClientInterfaceFunctions.GetResultLocationsFunction(jobId, rsId, knownRecords);
         return (ResultDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
     }
+
+    @Override
+    public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception {
+        HyracksClientInterfaceFunctions.GetResultMetadataFunction grmf =
+                new HyracksClientInterfaceFunctions.GetResultMetadataFunction(jobId, rsId);
+        return (IResultMetadata) rpci.call(ipcHandle, grmf);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index 752c3f5..c8f5bd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultSetReader;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord.Status;
@@ -122,6 +123,20 @@
         return readSize;
     }
 
+    @Override
+    public IResultMetadata getResultMetadata() {
+        try {
+            return resultDirectory.getResultMetadata(jobId, resultSetId);
+        } catch (HyracksDataException e) {
+            if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
+                LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
+        }
+        return null;
+    }
+
     private SocketAddress getSocketAddress(ResultDirectoryRecord record) throws HyracksDataException {
         try {
             final NetworkAddress netAddr = record.getNetworkAddress();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index fbce29d..b41d5a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
@@ -97,4 +98,11 @@
     public IGatekeeper getGatekeeper() {
         return node -> true;
     }
+
+    @Override
+    public IJobResultCallback getJobResultCallback() {
+        return (jobId, resultJobRecord) -> {
+            // no op
+        };
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 364a8ae..67e9599 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
 import org.apache.hyracks.control.cc.work.GetResultDirectoryAddressWork;
+import org.apache.hyracks.control.cc.work.GetResultMetadataWork;
 import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
 import org.apache.hyracks.control.cc.work.GetResultStatusWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
@@ -137,6 +138,12 @@
                 ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
                         gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid)));
                 break;
+            case GET_RESULT_METADATA:
+                HyracksClientInterfaceFunctions.GetResultMetadataFunction grmf =
+                        (HyracksClientInterfaceFunctions.GetResultMetadataFunction) fn;
+                ccs.getWorkQueue().schedule(new GetResultMetadataWork(ccs, grmf.getJobId(), grmf.getResultSetId(),
+                        new IPCResponder<>(handle, mid)));
+                break;
             case WAIT_FOR_COMPLETION:
                 HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
                         (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index e022dfe..0e4ad41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -119,7 +119,7 @@
                         (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                 ccs.getWorkQueue()
                         .schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(),
-                                rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(),
+                                rrplf.getMetadata(), rrplf.getEmptyResult(), rrplf.getPartition(),
                                 rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                 break;
             case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 419dff6..de1b18d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -230,7 +230,7 @@
         jobLog.open();
         startApplication();
 
-        resultDirectoryService.init(executor);
+        resultDirectoryService.init(executor, application.getJobResultCallback());
         workQueue.start();
         connectNCs();
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index 5451e3f..b1ecb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -25,16 +25,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.IResultManager;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord.Status;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
 public interface IResultDirectoryService extends IJobLifecycleListener, IResultManager {
-    public void init(ExecutorService executor);
+    public void init(ExecutorService executor, IJobResultCallback callback);
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
             boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
             throws HyracksDataException;
 
@@ -45,6 +47,8 @@
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
+    public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
     public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownLocations,
             IResultCallback<ResultDirectoryRecord[]> callback) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index cdfa4d3..a2218e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,8 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.result.IJobResultCallback;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultStateRecord;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord;
@@ -61,6 +63,7 @@
     private final long resultSweepThreshold;
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
+    private IJobResultCallback jobResultCallback;
 
     public ResultDirectoryService(long resultTTL, long resultSweepThreshold) {
         super(resultTTL);
@@ -69,8 +72,9 @@
     }
 
     @Override
-    public void init(ExecutorService executor) {
+    public void init(ExecutorService executor, IJobResultCallback jobResultCallback) {
         executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
+        this.jobResultCallback = jobResultCallback;
     }
 
     @Override
@@ -91,7 +95,11 @@
 
     @Override
     public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
-        // Auto-generated method stub
+        if (exceptions == null || exceptions.isEmpty()) {
+            final ResultJobRecord resultJobRecord = getNonNullResultJobRecord(jobId);
+            resultJobRecord.finish();
+            jobResultCallback.completed(jobId, resultJobRecord);
+        }
     }
 
     private ResultJobRecord getResultJobRecord(JobId jobId) {
@@ -108,11 +116,11 @@
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
             boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
             throws HyracksDataException {
         ResultJobRecord djr = getNonNullResultJobRecord(jobId);
-        djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
+        djr.setResultSetMetaData(rsId, metadata, nPartitions);
         ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
 
         record.setNetworkAddress(networkAddress);
@@ -169,6 +177,11 @@
     }
 
     @Override
+    public synchronized IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+        return getNonNullResultJobRecord(jobId).getResultSetMetaData(rsId).getMetadata();
+    }
+
+    @Override
     public synchronized Set<JobId> getJobIds() {
         return jobResultLocations.keySet();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java
new file mode 100644
index 0000000..f2ad7d3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultMetadataWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final IResultCallback<IResultMetadata> callback;
+
+    public GetResultMetadataWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            IResultCallback<IResultMetadata> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.rsId = rsId;
+        this.callback = callback;
+    }
+
+    @Override
+    public void doRun() {
+        try {
+            IResultMetadata metadata = ccs.getResultDirectoryService().getResultMetadata(jobId, rsId);
+            callback.setValue(metadata);
+        } catch (HyracksDataException e) {
+            callback.setException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 67645bc..b788e27 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.JobRun;
@@ -43,7 +44,7 @@
 
     private final ResultSetId rsId;
 
-    private final boolean orderedResult;
+    private final IResultMetadata metadata;
 
     private final boolean emptyResult;
 
@@ -54,11 +55,12 @@
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+            IResultMetadata metadata, boolean emptyResult, int partition, int nPartitions,
+            NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
-        this.orderedResult = orderedResult;
+        this.metadata = metadata;
         this.emptyResult = emptyResult;
         this.partition = partition;
         this.nPartitions = nPartitions;
@@ -68,7 +70,7 @@
     @Override
     public void run() {
         try {
-            ccs.getResultDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+            ccs.getResultDirectoryService().registerResultPartitionLocation(jobId, rsId, metadata, emptyResult,
                     partition, nPartitions, networkAddress);
         } catch (HyracksDataException e) {
             LOGGER.log(Level.WARN, "Failed to register partition location", e);
@@ -85,7 +87,7 @@
     @Override
     public String toString() {
         return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@"
-                + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult
-                + " EmptyResult@" + emptyResult;
+                + nPartitions + " ResultPartitionLocation@" + networkAddress + " metadata@" + metadata + " EmptyResult@"
+                + emptyResult;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index c811169..c8106e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -65,7 +66,7 @@
 
     void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
-    void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult,
+    void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult,
             int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
 
     void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 1616343..2ba4768 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -56,6 +56,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -590,7 +591,7 @@
 
         private final ResultSetId rsId;
 
-        private final boolean orderedResult;
+        private final IResultMetadata metadata;
 
         private final boolean emptyResult;
 
@@ -600,11 +601,11 @@
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
                 boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
             this.jobId = jobId;
             this.rsId = rsId;
-            this.orderedResult = orderedResult;
+            this.metadata = metadata;
             this.emptyResult = emptyResult;
             this.partition = partition;
             this.nPartitions = nPartitions;
@@ -624,8 +625,8 @@
             return rsId;
         }
 
-        public boolean getOrderedResult() {
-            return orderedResult;
+        public IResultMetadata getMetadata() {
+            return metadata;
         }
 
         public boolean getEmptyResult() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 13a08b2..344c3fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -132,10 +133,10 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
             boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
-        RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(jobId, rsId,
-                orderedResult, emptyResult, partition, nPartitions, networkAddress);
+        RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(jobId, rsId, metadata,
+                emptyResult, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index 835b59b..165cb95 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.control.common.result.AbstractResultManager;
@@ -70,12 +71,12 @@
     }
 
     @Override
-    public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+    public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, IResultMetadata metadata,
             boolean asyncMode, int partition, int nPartitions, long maxReads) {
         ResultPartitionWriter dpw;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
-            dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
+            dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, metadata, partition, nPartitions,
                     resultMemoryManager, fileFactory, maxReads);
             ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
             ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
@@ -87,10 +88,10 @@
 
     @Override
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-            boolean orderedResult, boolean emptyResult) throws HyracksException {
+            IResultMetadata metadata, boolean emptyResult) throws HyracksException {
         try {
             // Be sure to send the *public* network address to the CC
-            ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
+            ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, metadata,
                     emptyResult, partition, nPartitions, ncs.getResultNetworkManager().getPublicNetworkAddress());
         } catch (Exception e) {
             throw HyracksException.create(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
index 53f66e7..55db150 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.logging.log4j.LogManager;
@@ -41,7 +42,7 @@
 
     private final ResultSetId resultSetId;
 
-    private final boolean orderedResult;
+    private final IResultMetadata metadata;
 
     private final int partition;
 
@@ -58,12 +59,12 @@
     private boolean failed = false;
 
     public ResultPartitionWriter(IHyracksTaskContext ctx, IResultPartitionManager manager, JobId jobId,
-            ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
+            ResultSetId rsId, boolean asyncMode, IResultMetadata metadata, int partition, int nPartitions,
             ResultMemoryManager resultMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
-        this.orderedResult = orderedResult;
+        this.metadata = metadata;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.resultMemoryManager = resultMemoryManager;
@@ -127,8 +128,7 @@
     void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
         try {
             if (!partitionRegistered) {
-                manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult,
-                        empty);
+                manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, metadata, empty);
                 partitionRegistered = true;
             }
         } catch (HyracksException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 9fb9815..8006fc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameOutputStream;
@@ -46,18 +47,18 @@
 
     private final ResultSetId rsId;
 
-    private final boolean ordered;
+    private final IResultMetadata metadata;
 
     private final boolean asyncMode;
 
     private final IResultSerializerFactory resultSerializerFactory;
     private final long maxReads;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, IResultMetadata metadata,
             boolean asyncMode, IResultSerializerFactory resultSerializerFactory, long maxReads) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
-        this.ordered = ordered;
+        this.metadata = metadata;
         this.asyncMode = asyncMode;
         this.resultSerializerFactory = resultSerializerFactory;
         this.maxReads = maxReads;
@@ -88,7 +89,7 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, ordered,
+                    resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, metadata,
                             asyncMode, partition, nPartitions, maxReads);
                     resultPartitionWriter.open();
                     resultSerializer.init();
@@ -140,7 +141,7 @@
                 StringBuilder sb = new StringBuilder();
                 sb.append("{ ");
                 sb.append("\"rsId\": \"").append(rsId).append("\", ");
-                sb.append("\"ordered\": ").append(ordered).append(", ");
+                sb.append("\"metadata\": ").append(metadata).append(", ");
                 sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
                 sb.append("\"maxReads\": ").append(maxReads).append(" }");
                 return sb.toString();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index da85e74..0a57232 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -98,7 +98,7 @@
             throws IOException {
 
         ResultSetId rsId = new ResultSetId(1);
-        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index 367fc51d..fd236ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -99,7 +99,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
@@ -168,7 +168,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
@@ -238,7 +238,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 7e239d9..2593e1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -156,7 +156,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 8070d7d..c286e52 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -228,7 +228,7 @@
             throws IOException {
 
         ResultSetId rsId = new ResultSetId(1);
-        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index bc5de11..0d4d0e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -94,7 +94,7 @@
             ResultSetId rsId = new ResultSetId(i);
             spec.addResultSetId(rsId);
 
-            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                     ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
             PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index 7838a34..66a2cbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -67,7 +67,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
@@ -104,7 +104,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
@@ -143,7 +143,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index d85ad5d..0c2934a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -84,7 +84,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -135,7 +135,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 9ca3528..a8bb1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -139,7 +139,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -188,7 +188,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -241,7 +241,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -295,7 +295,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -349,7 +349,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -408,7 +408,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -466,7 +466,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -530,7 +530,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index c0bbfe3..63d2fdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -121,7 +121,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -205,7 +205,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -289,7 +289,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -378,7 +378,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 48f7837..9fddc12 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -74,7 +74,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);