[NO ISSUE][API] Return Async Query Results As JSON
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Add ResultMetadata that can hold metadata about a job's
result.
- Add required CC functions to get ResultMetadata.
- Add IJobResultCallback that is called when a job's result
is written successfully.
- Calculate a job's processed objects on the job completion.
- Calculate a job's duration on the job completion.
- Use the async query request format when serving the result
from QueryResultApiServlet.
- Return a proper JSON envelop in QueryResultApiServlet
which includes the results as well as metrics fields.
- Add test case for returning JSON response for async
query.
Change-Id: Ic4812a14925099a677f9e77a0040f881d2600724
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3401
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
index 8e0214a..fea9340 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
@@ -22,6 +22,7 @@
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.api.result.IResultMetadata;
/**
* The interface is in charge of translating language expressions into logical query plans.
@@ -37,11 +38,13 @@
* the output dataset name (only for insert/delete).
* @param stmt,
* the compiled dml statement (only for insert/delete).
+ * @param resultMetadata,
+ * some result metadata that can be retrieved with the result
* @return a logical query plan for the query.
* @throws AlgebricksException
*/
- public ILogicalPlan translate(Query query, String outputDatasetName, ICompiledDmlStatement stmt)
- throws AlgebricksException;
+ public ILogicalPlan translate(Query query, String outputDatasetName, ICompiledDmlStatement stmt,
+ IResultMetadata resultMetadata) throws AlgebricksException;
/**
* Translates a load statement.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index b7e9701..b8bfffd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -156,6 +156,7 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
+import org.apache.hyracks.api.result.IResultMetadata;
/**
* Each visit returns a pair of an operator and a variable. The variable
@@ -299,13 +300,13 @@
}
@Override
- public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
- throws AlgebricksException {
- return translate(expr, outputDatasetName, stmt, null);
+ public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
+ IResultMetadata resultMetadata) throws AlgebricksException {
+ return translate(expr, outputDatasetName, stmt, null, resultMetadata);
}
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
- ILogicalOperator baseOp) throws AlgebricksException {
+ ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
if (baseOp != null) {
base = new MutableObject<>(baseOp);
@@ -332,7 +333,7 @@
writeExprList.add(new MutableObject<>(resVarRef));
ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
- DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink);
+ DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink, resultMetadata);
newTop.setSourceLocation(sourceLoc);
newTop.getInputs().add(new MutableObject<>(topOp));
topOp = newTop;
@@ -421,12 +422,12 @@
switch (stmt.getKind()) {
case INSERT:
leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
- additionalFilteringExpressions, assign, stmt);
+ additionalFilteringExpressions, assign, stmt, resultMetadata);
break;
case UPSERT:
leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, topOp, exprs,
- resVar, additionalFilteringAssign, stmt);
+ resVar, additionalFilteringAssign, stmt, resultMetadata);
break;
case DELETE:
leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
@@ -470,7 +471,7 @@
List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
- ICompiledDmlStatement stmt) throws AlgebricksException {
+ ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
if (!targetDatasource.getDataset().allow(topOp, DatasetUtil.OP_UPSERT)) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -577,13 +578,13 @@
rootOperator = delegateOperator;
// Compiles the return expression.
- return processReturningExpression(rootOperator, upsertOp, compiledUpsert);
+ return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
}
private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading,
List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
- ICompiledDmlStatement stmt) throws AlgebricksException {
+ ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
if (targetDatasource.getDataset().hasMetaPart()) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -605,13 +606,14 @@
rootOperator.setSourceLocation(sourceLoc);
// Compiles the return expression.
- return processReturningExpression(rootOperator, insertOp, compiledInsert);
+ return processReturningExpression(rootOperator, insertOp, compiledInsert, resultMetadata);
}
// Stitches the translated operators for the returning expression into the query
// plan.
private ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
- InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert) throws AlgebricksException {
+ InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert, IResultMetadata resultMetadata)
+ throws AlgebricksException {
Expression returnExpression = compiledInsert.getReturnExpression();
if (returnExpression == null) {
return inputOperator;
@@ -644,7 +646,7 @@
expressions.add(new MutableObject<>(new VariableReferenceExpression(resultVar)));
ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
- DistributeResultOperator distResultOperator = new DistributeResultOperator(expressions, sink);
+ DistributeResultOperator distResultOperator = new DistributeResultOperator(expressions, sink, resultMetadata);
distResultOperator.getInputs().add(new MutableObject<>(createResultAssignOperator));
distResultOperator.setSourceLocation(sourceLoc);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 81151d0..d558e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -212,8 +212,9 @@
metadataProvider.setTxnId(txnId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter, externalVars);
-
- ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement);
+ ResultMetadata resultMetadata = new ResultMetadata(output.config().fmt());
+ ILogicalPlan plan =
+ isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement, resultMetadata);
if ((isQuery || isLoad) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
&& conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
new file mode 100644
index 0000000..69f46a4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.common;
+
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.result.IResultMetadata;
+
+public class ResultMetadata implements IResultMetadata {
+
+ private final SessionConfig.OutputFormat format;
+ private long jobDuration;
+ private long processedObjects;
+
+ public ResultMetadata(SessionConfig.OutputFormat format) {
+ this.format = format;
+ }
+
+ public SessionConfig.OutputFormat getFormat() {
+ return format;
+ }
+
+ public long getProcessedObjects() {
+ return processedObjects;
+ }
+
+ public void setProcessedObjects(long processedObjects) {
+ this.processedObjects = processedObjects;
+ }
+
+ public void setJobDuration(long jobDuration) {
+ this.jobDuration = jobDuration;
+ }
+
+ public long getJobDuration() {
+ return jobDuration;
+ }
+
+ @Override
+ public String toString() {
+ return "ResultMetadata{" + "format=" + format + ", jobDuration=" + jobDuration + ", processedObjects="
+ + processedObjects + '}';
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 7f74c92..cda4d34 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
+import org.apache.asterix.api.common.ResultMetadata;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
@@ -49,18 +50,16 @@
@Override
protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ long elapsedStart = System.nanoTime();
HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, request);
-
final String strHandle = localPath(request);
final ResultHandle handle = ResultHandle.parse(strHandle);
if (handle == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
-
IResultSet resultSet = getResultSet();
ResultReader resultReader = new ResultReader(resultSet, handle.getJobId(), handle.getResultSetId());
-
try {
ResultJobRecord.Status status = resultReader.getStatus();
@@ -86,15 +85,20 @@
if (httpStatus != HttpResponseStatus.OK) {
return;
}
-
- // QQQ The output format is determined by the initial
- // query and cannot be modified here, so calling back to
- // initResponse() is really an error. We need to find a
- // way to send the same OutputFormat value here as was
- // originally determined there. Need to save this value on
- // some object that we can obtain here.
- SessionOutput sessionOutput = initResponse(request, response);
- ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
+ ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata();
+ SessionOutput sessionOutput = initResponse(request, response, metadata.getFormat());
+ if (metadata.getFormat() == SessionConfig.OutputFormat.CLEAN_JSON
+ || metadata.getFormat() == SessionConfig.OutputFormat.LOSSLESS_JSON) {
+ final Stats stats = new Stats();
+ sessionOutput.out().print("{\n");
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, null);
+ QueryServiceServlet.printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart,
+ metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
+ 0, HttpUtil.getPreferredCharset(request));
+ sessionOutput.out().print("}\n");
+ } else {
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
+ }
} catch (HyracksDataException e) {
final int errorCode = e.getErrorCode();
if (ErrorCode.NO_RESULT_SET == errorCode) {
@@ -119,37 +123,15 @@
* SessionConfig with the appropriate output writer and output-format
* based on the Accept: header and other servlet parameters.
*/
- static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException {
- // CLEAN_JSON output is the default; most generally useful for a
- // programmatic HTTP API
- SessionConfig.OutputFormat format = SessionConfig.OutputFormat.CLEAN_JSON;
- // First check the "output" servlet parameter.
- String output = request.getParameter("output");
- String accept = request.getHeader("Accept", "");
- if (output != null) {
- if ("CSV".equals(output)) {
- format = SessionConfig.OutputFormat.CSV;
- } else if ("ADM".equals(output)) {
- format = SessionConfig.OutputFormat.ADM;
- }
- } else {
- // Second check the Accept: HTTP header.
- if (accept.contains("application/x-adm")) {
- format = SessionConfig.OutputFormat.ADM;
- } else if (accept.contains("text/csv")) {
- format = SessionConfig.OutputFormat.CSV;
- }
+ static SessionOutput initResponse(IServletRequest request, IServletResponse response,
+ SessionConfig.OutputFormat format) throws IOException {
+ String accept = request.getHeader("Accept");
+ if (accept == null) {
+ accept = "";
}
SessionConfig.PlanFormat planFormat = SessionConfig.PlanFormat.get(request.getParameter("plan-format"),
"plan format", SessionConfig.PlanFormat.STRING, LOGGER);
- // If it's JSON, check for the "lossless" flag
-
- if (format == SessionConfig.OutputFormat.CLEAN_JSON
- && ("true".equals(request.getParameter("lossless")) || accept.contains("lossless=true"))) {
- format = SessionConfig.OutputFormat.LOSSLESS_JSON;
- }
-
SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
.append("\":" + " \"").append(handle).append("\" }");
SessionConfig sessionConfig = new SessionConfig(format, planFormat);
@@ -168,6 +150,8 @@
}
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, wrapperArray);
// Now that format is set, output the content-type
+ SessionOutput.ResultDecorator resultPrefix = null;
+ SessionOutput.ResultDecorator resultPostfix = null;
switch (format) {
case ADM:
HttpUtil.setContentType(response, "application/x-adm", request);
@@ -176,6 +160,8 @@
// No need to reflect "clean-ness" in output type; fall through
case LOSSLESS_JSON:
HttpUtil.setContentType(response, "application/json", request);
+ resultPrefix = ResultUtil.createPreResultDecorator();
+ resultPostfix = ResultUtil.createPostResultDecorator();
break;
case CSV:
// Check for header parameter or in Accept:.
@@ -189,7 +175,7 @@
default:
throw new IOException("Unknown format " + format);
}
- return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null);
+ return new SessionOutput(sessionConfig, response.writer(), resultPrefix, resultPostfix, appendHandle, null);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 26e7430..06b75e3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -322,7 +322,7 @@
}
}
- private static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
+ public static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
long resultSize, long processedObjects, long errorCount, long warnCount, Charset resultCharset) {
boolean hasErrors = errorCount != 0;
boolean hasWarnings = warnCount != 0;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
new file mode 100644
index 0000000..30814c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import java.util.Collection;
+
+import org.apache.asterix.api.common.ResultMetadata;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IJobResultCallback;
+import org.apache.hyracks.api.result.ResultJobRecord;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
+import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class JobResultCallback implements IJobResultCallback {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICcApplicationContext appCtx;
+
+ public JobResultCallback(ICcApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ public void completed(JobId jobId, ResultJobRecord resultJobRecord) {
+ try {
+ updateResultMetadata(jobId, resultJobRecord);
+ } catch (Exception e) {
+ LOGGER.error("failed to update result metadata for {}", jobId, e);
+ }
+ }
+
+ private void updateResultMetadata(JobId jobId, ResultJobRecord resultJobRecord) {
+ final ResultMetadata metadata = (ResultMetadata) resultJobRecord.getResultSetMetaData().getMetadata();
+ metadata.setJobDuration(resultJobRecord.getJobDuration());
+ metadata.setProcessedObjects(getJobProccssedObjects(jobId));
+ }
+
+ private long getJobProccssedObjects(JobId jobId) {
+ long processedObjects = 0;
+ IJobManager jobManager =
+ ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+ final JobRun run = jobManager.get(jobId);
+ if (run != null) {
+ final JobProfile jobProfile = run.getJobProfile();
+ final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
+ for (JobletProfile jp : jobletProfiles) {
+ final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
+ for (TaskProfile tp : jobletTasksProfile) {
+ processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+ }
+ }
+ }
+ return processedObjects;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index 09bb8a3..1acae87 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
@@ -52,4 +53,8 @@
public IFrameTupleAccessor getFrameTupleAccessor() {
return frameTupleAccessor;
}
+
+ public IResultMetadata getMetadata() {
+ return reader.getResultMetadata();
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0c19303..8c0cc98 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -23,7 +23,6 @@
import java.io.InputStream;
import java.rmi.RemoteException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@@ -195,16 +194,10 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.IJobManager;
-import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
-import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
-import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -2512,7 +2505,7 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
- updateJobStats(id, stats);
+ updateJobStats(id, stats, metadataProvider.getResultSetId());
// stop buffering and allow for streaming result delivery
sessionOutput.release();
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
@@ -2521,7 +2514,7 @@
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
- updateJobStats(id, stats);
+ updateJobStats(id, stats, metadataProvider.getResultSetId());
ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
if (outMetadata != null) {
outMetadata.getResultSets()
@@ -2534,23 +2527,13 @@
}
}
- private void updateJobStats(JobId jobId, Stats stats) {
- final IJobManager jobManager =
- ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
- final JobRun run = jobManager.get(jobId);
- if (run == null || run.getStatus() != JobStatus.TERMINATED) {
- return;
- }
- final JobProfile jobProfile = run.getJobProfile();
- final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
- long processedObjects = 0;
- for (JobletProfile jp : jobletProfiles) {
- final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
- for (TaskProfile tp : jobletTasksProfile) {
- processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
- }
- }
- stats.setProcessedObjects(processedObjects);
+ private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+ final ClusterControllerService controllerService =
+ (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+ org.apache.asterix.api.common.ResultMetadata resultMetadata =
+ (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
+ .getResultMetadata(jobId, rsId);
+ stats.setProcessedObjects(resultMetadata.getProcessedObjects());
}
private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 99286a2..5998599 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -53,6 +53,7 @@
import org.apache.asterix.app.config.ConfigValidator;
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
+import org.apache.asterix.app.result.JobResultCallback;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.INodeJobTracker;
@@ -91,6 +92,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -356,4 +358,9 @@
public IGatekeeper getGatekeeper() {
return getConfigManager().getAppConfig().getNCNames()::contains;
}
+
+ @Override
+ public IJobResultCallback getJobResultCallback() {
+ return new JobResultCallback(appCtx);
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b8beccc..8969f18 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -95,7 +95,7 @@
exchange.setPhysicalOperator(new OneToOneExchangePOperator());
exchange.getInputs().add(new MutableObject<>(assignOperator));
- DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
resultOperator.setExecutionMode(UNPARTITIONED);
resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(exchange));
@@ -137,7 +137,7 @@
orderOperator.setPhysicalOperator(new StableSortPOperator());
orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
- DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
resultOperator.setExecutionMode(PARTITIONED);
resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(orderOperator));
@@ -217,7 +217,7 @@
secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
- DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
resultOperator.setExecutionMode(PARTITIONED);
resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(secondJoin));
@@ -277,7 +277,7 @@
secondJoin.getInputs().add(new MutableObject<>(order1));
secondJoin.getInputs().add(new MutableObject<>(order2));
- DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null, null);
resultOperator.setExecutionMode(PARTITIONED);
resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(secondJoin));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index ab8f172..cbe41e8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -145,6 +145,7 @@
private static final Pattern MAX_RESULT_READS_PATTERN =
Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
private static final Pattern HTTP_REQUEST_TYPE = Pattern.compile("requesttype=(.*)", Pattern.MULTILINE);
+ private static final Pattern EXTRACT_RESULT_TYPE = Pattern.compile("extractresult=(.*)", Pattern.MULTILINE);
private static final String NC_ENDPOINT_PREFIX = "nc:";
public static final int TRUNCATE_THRESHOLD = 16384;
public static final Set<String> NON_CANCELLABLE =
@@ -1224,11 +1225,15 @@
final List<Parameter> params = extractParameters(statement);
final Optional<String> body = extractBody(statement);
final Predicate<Integer> statusCodePredicate = extractStatusCodePredicate(statement);
+ final boolean extracResult = isExtracResult(statement);
InputStream resultStream;
if ("http".equals(extension)) {
resultStream = executeHttp(reqType, variablesReplaced, fmt, params, statusCodePredicate, body);
} else if ("uri".equals(extension)) {
resultStream = executeURI(reqType, URI.create(variablesReplaced), fmt, params, statusCodePredicate, body);
+ if (extracResult) {
+ resultStream = ResultExtractor.extract(resultStream, UTF_8);
+ }
} else {
throw new IllegalArgumentException("Unexpected format for method " + reqType + ": " + extension);
}
@@ -1574,6 +1579,11 @@
return m.find() ? m.group(1) : null;
}
+ private static boolean isExtracResult(String statement) {
+ Matcher m = EXTRACT_RESULT_TYPE.matcher(statement);
+ return m.find() ? Boolean.valueOf(m.group(1)) : false;
+ }
+
private static boolean isJsonEncoded(String httpRequestType) throws Exception {
if (httpRequestType == null || httpRequestType.isEmpty()) {
return true;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 5324a0d..ff75720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -56,4 +56,9 @@
<source-location>false</source-location>
</compilation-unit>
</test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-json">
+ <output-dir compare="Clean-JSON">async-json</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp
new file mode 100644
index 0000000..e24253a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- handlevariable=status
+
+select i, i * i as i2 from range(1, 5) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri
new file mode 100644
index 0000000..d10aed9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.2.pollget.uri
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- polltimeoutsecs=10
+-- handlevariable=result
+
+$status
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri
new file mode 100644
index 0000000..6496a4b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.3.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- extractresult=true
+$result
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex
new file mode 100644
index 0000000..4308ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.2.regex
@@ -0,0 +1,2 @@
+/"status": "success"/
+/"handle": ".*"/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json
new file mode 100644
index 0000000..d4e1a2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-json/async-json.3.json
@@ -0,0 +1 @@
+{"i":1,"i2":1}{"i":2,"i2":4}{"i":3,"i2":9}{"i":4,"i2":16}{"i":5,"i2":25}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 418ef4b..0a72ceb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -131,6 +131,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
@@ -597,8 +598,8 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
- JobSpecification spec) throws AlgebricksException {
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+ IResultMetadata metadata, JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
@@ -606,7 +607,7 @@
try {
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
.getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
- resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
+ resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(),
resultSerializedAppenderFactory, getMaxResultReads());
} catch (IOException e) {
throw new AlgebricksException(e);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 3d004a2..6d001dc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public interface IMetadataProvider<S, I> {
@@ -58,8 +59,8 @@
throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
- JobSpecification spec) throws AlgebricksException;
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+ IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
index c40e035..22ee1c6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
@@ -32,14 +32,18 @@
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.api.result.IResultMetadata;
public class DistributeResultOperator extends AbstractLogicalOperator {
private List<Mutable<ILogicalExpression>> expressions;
private IDataSink dataSink;
+ private IResultMetadata resultMetadata;
- public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
+ public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink,
+ IResultMetadata resultMetadata) {
this.expressions = expressions;
this.dataSink = dataSink;
+ this.resultMetadata = resultMetadata;
}
public List<Mutable<ILogicalExpression>> getExpressions() {
@@ -93,4 +97,7 @@
return createPropagatingAllInputsTypeEnvironment(ctx);
}
+ public IResultMetadata getResultMetadata() {
+ return resultMetadata;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 1727d10..fd4009f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -277,7 +277,7 @@
throws AlgebricksException {
ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
- return new DistributeResultOperator(newExpressions, op.getDataSink());
+ return new DistributeResultOperator(newExpressions, op.getDataSink(), op.getResultMetadata());
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index a309d9e..138cff8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -102,8 +102,8 @@
IPrinterFactory[] pf =
JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
- mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+ resultOp.getDataSink(), columns, pf, inputDesc, resultOp.getResultMetadata(), spec);
IOperatorDescriptor opDesc = runtimeAndConstraints.first;
opDesc.setSourceLocation(resultOp.getSourceLocation());
builder.contributeHyracksOperator(resultOp, opDesc);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index aa1021b..5cc8f69 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.result.IJobResultCallback;
public interface ICCApplication extends IApplication {
@@ -30,4 +31,10 @@
IGatekeeper getGatekeeper();
+ /**
+ * Gets the job result callback
+ *
+ * @return the job result callback
+ */
+ IJobResultCallback getJobResultCallback();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 36ce18f..0ee3658 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -46,6 +46,7 @@
GET_RESULT_DIRECTORY_ADDRESS,
GET_RESULT_STATUS,
GET_RESULT_LOCATIONS,
+ GET_RESULT_METADATA,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
CLI_DEPLOY_BINARY,
@@ -318,6 +319,32 @@
}
}
+ public static class GetResultMetadataFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ public GetResultMetadataFunction(JobId jobId, ResultSetId rsId) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_RESULT_METADATA;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+ }
+
public static class WaitForCompletionFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
new file mode 100644
index 0000000..2d5fea1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.job.JobId;
+
+public interface IJobResultCallback {
+
+ /**
+ * Notifies this callback that writing the result of job {@code jobId} has been completed successfully.
+ *
+ * @param jobId
+ * @param resultJobRecord
+ */
+ void completed(JobId jobId, ResultJobRecord resultJobRecord);
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
index 65c8d09..4798494 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -48,4 +48,13 @@
*/
ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords)
throws Exception;
+
+ /**
+ * Gets the result metadata
+ * @param jobId
+ * @param rsId
+ * @return
+ * @throws Exception
+ */
+ IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java
new file mode 100644
index 0000000..12696af
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultMetadata.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.io.Serializable;
+
+public interface IResultMetadata extends Serializable {
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
index a539d37..4909903 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -24,11 +24,11 @@
import org.apache.hyracks.api.job.JobId;
public interface IResultPartitionManager extends IResultManager {
- IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, IResultMetadata metadata,
boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException;
void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- boolean orderedResult, boolean emptyResult) throws HyracksException;
+ IResultMetadata metadata, boolean emptyResult) throws HyracksException;
void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
index 1669c36..1652dd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
@@ -26,4 +26,11 @@
Status getResultStatus();
int read(IFrame frame) throws HyracksDataException;
+
+ /**
+ * Gets the result metadata
+ *
+ * @return the result metadata
+ */
+ IResultMetadata getResultMetadata();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index b8ddbd2..b3b0706 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResultJobRecord implements IResultStateRecord {
+
public enum State {
IDLE,
RUNNING,
@@ -78,9 +79,12 @@
private static final long serialVersionUID = 1L;
private final long timestamp;
-
+ private long jobStartTime;
+ private long jobEndTime;
private Status status;
+ private ResultSetMetaData resultSetMetaData;
+
private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
public ResultJobRecord() {
@@ -96,9 +100,18 @@
}
public void start() {
+ jobStartTime = System.nanoTime();
updateState(State.RUNNING);
}
+ public void finish() {
+ jobEndTime = System.nanoTime();
+ }
+
+ public long getJobDuration() {
+ return jobEndTime - jobStartTime;
+ }
+
public void success() {
updateState(State.SUCCESS);
}
@@ -130,12 +143,14 @@
return sb.toString();
}
- public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions)
+ public synchronized void setResultSetMetaData(ResultSetId rsId, IResultMetadata metadata, int nPartitions)
throws HyracksDataException {
ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
if (rsMd == null) {
- resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult));
- } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) {
+ final ResultSetMetaData resultSetMetaData = new ResultSetMetaData(nPartitions, metadata);
+ resultSetMetadataMap.put(rsId, resultSetMetaData);
+ this.resultSetMetaData = resultSetMetaData;
+ } else if (rsMd.getRecords().length != nPartitions) {
throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
}
//TODO(tillw) throwing a HyracksDataException here hangs the execution tests
@@ -174,4 +189,8 @@
success();
}
}
+
+ public synchronized ResultSetMetaData getResultSetMetaData() {
+ return resultSetMetaData;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
index b7b8f1c..60c76c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
@@ -22,15 +22,15 @@
public class ResultSetMetaData {
private final ResultDirectoryRecord[] records;
- private final boolean ordered;
+ private final IResultMetadata metadata;
- ResultSetMetaData(int len, boolean ordered) {
+ ResultSetMetaData(int len, IResultMetadata metadata) {
this.records = new ResultDirectoryRecord[len];
- this.ordered = ordered;
+ this.metadata = metadata;
}
- public boolean getOrderedResult() {
- return ordered;
+ public IResultMetadata getMetadata() {
+ return metadata;
}
public ResultDirectoryRecord[] getRecords() {
@@ -40,7 +40,7 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records));
+ sb.append("{metadata: ").append(metadata).append(", records: ").append(Arrays.toString(records));
return sb.toString();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index d7f79fd..b335a93 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
@@ -58,4 +59,9 @@
ResultDirectoryRecord[] knownRecords) throws Exception {
return remoteResultDirectory.getResultLocations(jobId, rsId, knownRecords);
}
+
+ @Override
+ public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception {
+ return remoteResultDirectory.getResultMetadata(jobId, rsId);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index a1adda3..ed628f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
@@ -52,4 +53,11 @@
new HyracksClientInterfaceFunctions.GetResultLocationsFunction(jobId, rsId, knownRecords);
return (ResultDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
}
+
+ @Override
+ public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception {
+ HyracksClientInterfaceFunctions.GetResultMetadataFunction grmf =
+ new HyracksClientInterfaceFunctions.GetResultMetadataFunction(jobId, rsId);
+ return (IResultMetadata) rpci.call(ipcHandle, grmf);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index 752c3f5..c8f5bd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
@@ -122,6 +123,20 @@
return readSize;
}
+ @Override
+ public IResultMetadata getResultMetadata() {
+ try {
+ return resultDirectory.getResultMetadata(jobId, resultSetId);
+ } catch (HyracksDataException e) {
+ if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
+ LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
+ }
+ return null;
+ }
+
private SocketAddress getSocketAddress(ResultDirectoryRecord record) throws HyracksDataException {
try {
final NetworkAddress netAddr = record.getNetworkAddress();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index fbce29d..b41d5a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
@@ -97,4 +98,11 @@
public IGatekeeper getGatekeeper() {
return node -> true;
}
+
+ @Override
+ public IJobResultCallback getJobResultCallback() {
+ return (jobId, resultJobRecord) -> {
+ // no op
+ };
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 364a8ae..67e9599 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
import org.apache.hyracks.control.cc.work.GetResultDirectoryAddressWork;
+import org.apache.hyracks.control.cc.work.GetResultMetadataWork;
import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import org.apache.hyracks.control.cc.work.GetResultStatusWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
@@ -137,6 +138,12 @@
ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid)));
break;
+ case GET_RESULT_METADATA:
+ HyracksClientInterfaceFunctions.GetResultMetadataFunction grmf =
+ (HyracksClientInterfaceFunctions.GetResultMetadataFunction) fn;
+ ccs.getWorkQueue().schedule(new GetResultMetadataWork(ccs, grmf.getJobId(), grmf.getResultSetId(),
+ new IPCResponder<>(handle, mid)));
+ break;
case WAIT_FOR_COMPLETION:
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index e022dfe..0e4ad41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -119,7 +119,7 @@
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
ccs.getWorkQueue()
.schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(),
- rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(),
+ rrplf.getMetadata(), rrplf.getEmptyResult(), rrplf.getPartition(),
rrplf.getNPartitions(), rrplf.getNetworkAddress()));
break;
case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 419dff6..de1b18d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -230,7 +230,7 @@
jobLog.open();
startApplication();
- resultDirectoryService.init(executor);
+ resultDirectoryService.init(executor, application.getJobResultCallback());
workQueue.start();
connectNCs();
LOGGER.log(Level.INFO, "Started ClusterControllerService");
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index 5451e3f..b1ecb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -25,16 +25,18 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.result.IResultManager;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.work.IResultCallback;
public interface IResultDirectoryService extends IJobLifecycleListener, IResultManager {
- public void init(ExecutorService executor);
+ public void init(ExecutorService executor, IJobResultCallback callback);
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
throws HyracksDataException;
@@ -45,6 +47,8 @@
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+ public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownLocations,
IResultCallback<ResultDirectoryRecord[]> callback) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index cdfa4d3..a2218e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,8 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.result.IJobResultCallback;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultStateRecord;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord;
@@ -61,6 +63,7 @@
private final long resultSweepThreshold;
private final Map<JobId, JobResultInfo> jobResultLocations;
+ private IJobResultCallback jobResultCallback;
public ResultDirectoryService(long resultTTL, long resultSweepThreshold) {
super(resultTTL);
@@ -69,8 +72,9 @@
}
@Override
- public void init(ExecutorService executor) {
+ public void init(ExecutorService executor, IJobResultCallback jobResultCallback) {
executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
+ this.jobResultCallback = jobResultCallback;
}
@Override
@@ -91,7 +95,11 @@
@Override
public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
- // Auto-generated method stub
+ if (exceptions == null || exceptions.isEmpty()) {
+ final ResultJobRecord resultJobRecord = getNonNullResultJobRecord(jobId);
+ resultJobRecord.finish();
+ jobResultCallback.completed(jobId, resultJobRecord);
+ }
}
private ResultJobRecord getResultJobRecord(JobId jobId) {
@@ -108,11 +116,11 @@
}
@Override
- public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
throws HyracksDataException {
ResultJobRecord djr = getNonNullResultJobRecord(jobId);
- djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
+ djr.setResultSetMetaData(rsId, metadata, nPartitions);
ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
record.setNetworkAddress(networkAddress);
@@ -169,6 +177,11 @@
}
@Override
+ public synchronized IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+ return getNonNullResultJobRecord(jobId).getResultSetMetaData(rsId).getMetadata();
+ }
+
+ @Override
public synchronized Set<JobId> getJobIds() {
return jobResultLocations.keySet();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java
new file mode 100644
index 0000000..f2ad7d3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultMetadataWork.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultMetadataWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final IResultCallback<IResultMetadata> callback;
+
+ public GetResultMetadataWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ IResultCallback<IResultMetadata> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ IResultMetadata metadata = ccs.getResultDirectoryService().getResultMetadata(jobId, rsId);
+ callback.setValue(metadata);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 67645bc..b788e27 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.JobRun;
@@ -43,7 +44,7 @@
private final ResultSetId rsId;
- private final boolean orderedResult;
+ private final IResultMetadata metadata;
private final boolean emptyResult;
@@ -54,11 +55,12 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ IResultMetadata metadata, boolean emptyResult, int partition, int nPartitions,
+ NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -68,7 +70,7 @@
@Override
public void run() {
try {
- ccs.getResultDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+ ccs.getResultDirectoryService().registerResultPartitionLocation(jobId, rsId, metadata, emptyResult,
partition, nPartitions, networkAddress);
} catch (HyracksDataException e) {
LOGGER.log(Level.WARN, "Failed to register partition location", e);
@@ -85,7 +87,7 @@
@Override
public String toString() {
return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@"
- + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult
- + " EmptyResult@" + emptyResult;
+ + nPartitions + " ResultPartitionLocation@" + networkAddress + " metadata@" + metadata + " EmptyResult@"
+ + emptyResult;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index c811169..c8106e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -65,7 +66,7 @@
void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult,
+ void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult,
int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 1616343..2ba4768 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -590,7 +591,7 @@
private final ResultSetId rsId;
- private final boolean orderedResult;
+ private final IResultMetadata metadata;
private final boolean emptyResult;
@@ -600,11 +601,11 @@
private NetworkAddress networkAddress;
- public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
this.jobId = jobId;
this.rsId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -624,8 +625,8 @@
return rsId;
}
- public boolean getOrderedResult() {
- return orderedResult;
+ public IResultMetadata getMetadata() {
+ return metadata;
}
public boolean getEmptyResult() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 13a08b2..344c3fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -132,10 +133,10 @@
}
@Override
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
- RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(jobId, rsId,
- orderedResult, emptyResult, partition, nPartitions, networkAddress);
+ RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(jobId, rsId, metadata,
+ emptyResult, partition, nPartitions, networkAddress);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index 835b59b..165cb95 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.result.AbstractResultManager;
@@ -70,12 +71,12 @@
}
@Override
- public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, IResultMetadata metadata,
boolean asyncMode, int partition, int nPartitions, long maxReads) {
ResultPartitionWriter dpw;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
- dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
+ dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, metadata, partition, nPartitions,
resultMemoryManager, fileFactory, maxReads);
ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
@@ -87,10 +88,10 @@
@Override
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- boolean orderedResult, boolean emptyResult) throws HyracksException {
+ IResultMetadata metadata, boolean emptyResult) throws HyracksException {
try {
// Be sure to send the *public* network address to the CC
- ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
+ ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, metadata,
emptyResult, partition, nPartitions, ncs.getResultNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
throw HyracksException.create(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
index 53f66e7..55db150 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.logging.log4j.LogManager;
@@ -41,7 +42,7 @@
private final ResultSetId resultSetId;
- private final boolean orderedResult;
+ private final IResultMetadata metadata;
private final int partition;
@@ -58,12 +59,12 @@
private boolean failed = false;
public ResultPartitionWriter(IHyracksTaskContext ctx, IResultPartitionManager manager, JobId jobId,
- ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
+ ResultSetId rsId, boolean asyncMode, IResultMetadata metadata, int partition, int nPartitions,
ResultMemoryManager resultMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.partition = partition;
this.nPartitions = nPartitions;
this.resultMemoryManager = resultMemoryManager;
@@ -127,8 +128,7 @@
void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
try {
if (!partitionRegistered) {
- manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult,
- empty);
+ manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, metadata, empty);
partitionRegistered = true;
}
} catch (HyracksException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 9fb9815..8006fc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.comm.io.FrameOutputStream;
@@ -46,18 +47,18 @@
private final ResultSetId rsId;
- private final boolean ordered;
+ private final IResultMetadata metadata;
private final boolean asyncMode;
private final IResultSerializerFactory resultSerializerFactory;
private final long maxReads;
- public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+ public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, IResultMetadata metadata,
boolean asyncMode, IResultSerializerFactory resultSerializerFactory, long maxReads) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
- this.ordered = ordered;
+ this.metadata = metadata;
this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
this.maxReads = maxReads;
@@ -88,7 +89,7 @@
@Override
public void open() throws HyracksDataException {
try {
- resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, ordered,
+ resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, metadata,
asyncMode, partition, nPartitions, maxReads);
resultPartitionWriter.open();
resultSerializer.init();
@@ -140,7 +141,7 @@
StringBuilder sb = new StringBuilder();
sb.append("{ ");
sb.append("\"rsId\": \"").append(rsId).append("\", ");
- sb.append("\"ordered\": ").append(ordered).append(", ");
+ sb.append("\"metadata\": ").append(metadata).append(", ");
sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
sb.append("\"maxReads\": ").append(maxReads).append(" }");
return sb.toString();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index da85e74..0a57232 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -98,7 +98,7 @@
throws IOException {
ResultSetId rsId = new ResultSetId(1);
- AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index 367fc51d..fd236ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -99,7 +99,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
@@ -168,7 +168,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
@@ -238,7 +238,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 7e239d9..2593e1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -156,7 +156,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 8070d7d..c286e52 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -228,7 +228,7 @@
throws IOException {
ResultSetId rsId = new ResultSetId(1);
- AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index bc5de11..0d4d0e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -94,7 +94,7 @@
ResultSetId rsId = new ResultSetId(i);
spec.addResultSetId(rsId);
- outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index 7838a34..66a2cbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -67,7 +67,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
@@ -104,7 +104,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
@@ -143,7 +143,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index d85ad5d..0c2934a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -84,7 +84,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -135,7 +135,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 9ca3528..a8bb1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -139,7 +139,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -188,7 +188,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -241,7 +241,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -295,7 +295,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -349,7 +349,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -408,7 +408,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -466,7 +466,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -530,7 +530,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index c0bbfe3..63d2fdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -121,7 +121,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -205,7 +205,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -289,7 +289,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -378,7 +378,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 48f7837..9fddc12 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -74,7 +74,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);