[ASTERIXDB-2249][API] Add Max Result Reads to API
- user model changes: no
- storage format changes: no
- interface changes: yes
- IRequestParameters: add ResultProperties
- IDatasetPartitionManager: add maxReads
Details:
- Add option to specify max result reads and default
it to 1.
- Fix exception handling in DatasetPartitionReader.
- Add option to specify maxResultReads in tests.
- Use new option in async-repeated test.
- Add test case for exhausted result.
Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2292
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 8d0f20b..a1fbac6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -31,9 +31,11 @@
IHyracksDataset getHyracksDataset();
/**
- * @return The {@code ResultDelivery} kind required for queries in the list of statements
+ * Gets the required result properties of the request.
+ *
+ * @return the result properties
*/
- IStatementExecutor.ResultDelivery getResultDelivery();
+ ResultProperties getResultProperties();
/**
* @return a reference to write the stats of executed queries
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
new file mode 100644
index 0000000..4866c6d
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.io.Serializable;
+
+public class ResultProperties implements Serializable {
+
+ public static final long DEFAULT_MAX_READS = 1;
+ private final IStatementExecutor.ResultDelivery delivery;
+ private final long maxReads;
+
+ public ResultProperties(IStatementExecutor.ResultDelivery delivery) {
+ this(delivery, DEFAULT_MAX_READS);
+ }
+
+ public ResultProperties(IStatementExecutor.ResultDelivery delivery, long maxReads) {
+ this.delivery = delivery;
+ this.maxReads = maxReads;
+ }
+
+ public IStatementExecutor.ResultDelivery getDelivery() {
+ return delivery;
+ }
+
+ public long getMaxReads() {
+ return maxReads;
+ }
+
+ public ResultProperties getNcToCcResultProperties() {
+ if (delivery != IStatementExecutor.ResultDelivery.IMMEDIATE) {
+ return this;
+ }
+ // switch IMMEDIATE to DEFERRED since the result will be severed by the NC
+ return new ResultProperties(IStatementExecutor.ResultDelivery.DEFERRED, maxReads);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 87c1c57..df2a2a1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -48,6 +48,7 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -164,7 +165,7 @@
double duration;
long startTime = System.currentTimeMillis();
final IRequestParameters requestParameters =
- new RequestParameters(hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
+ new RequestParameters(hds, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
long endTime = System.currentTimeMillis();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 83a40f0..76f489c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -41,6 +41,7 @@
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -66,14 +67,12 @@
@Override
protected void executeStatement(String statementsText, SessionOutput sessionOutput,
- IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+ ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param,
RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception {
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
- IStatementExecutor.ResultDelivery ccDelivery =
- delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ? IStatementExecutor.ResultDelivery.DEFERRED
- : delivery;
+ final IStatementExecutor.ResultDelivery delivery = resultProperties.getDelivery();
ExecuteStatementResponseMessage responseMsg;
MessageFuture responseFuture = ncMb.registerMessageFuture();
final String handleUrl = getHandleUrl(param.host, param.path, delivery);
@@ -86,8 +85,8 @@
timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
}
ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
- responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
- param.clientContextID, handleUrl, optionalParameters);
+ responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
+ resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters);
execution.start();
ncMb.sendMessageToCC(requestMsg);
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a8f14b5..23a7ba7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -48,6 +47,7 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -137,7 +137,8 @@
PRETTY("pretty"),
MODE("mode"),
TIMEOUT("timeout"),
- PLAN_FORMAT("plan-format");
+ PLAN_FORMAT("plan-format"),
+ MAX_RESULT_READS("max-result-reads");
private final String str;
@@ -193,6 +194,7 @@
boolean pretty;
String clientContextID;
String mode;
+ String maxResultReads;
@Override
public String toString() {
@@ -207,6 +209,7 @@
on.put("clientContextID", clientContextID);
on.put("format", format);
on.put("timeout", timeout);
+ on.put("maxResultReads", maxResultReads);
return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on);
} catch (JsonProcessingException e) { // NOSONAR
return e.getMessage();
@@ -383,6 +386,7 @@
param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str()));
param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str());
param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str());
+ param.maxResultReads = getOptText(jsonRequest, Parameter.MAX_RESULT_READS.str());
} catch (JsonParseException | JsonMappingException e) {
// if the JSON parsing fails, the statement is empty and we get an empty statement error
GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
@@ -397,6 +401,7 @@
param.mode = toLower(request.getParameter(Parameter.MODE.str()));
param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str());
param.timeout = request.getParameter(Parameter.TIMEOUT.str());
+ param.maxResultReads = request.getParameter(Parameter.MAX_RESULT_READS.str());
}
return param;
}
@@ -448,6 +453,10 @@
ResultDelivery delivery = parseResultDelivery(param.mode);
+ final ResultProperties resultProperties = param.maxResultReads == null ?
+ new ResultProperties(delivery) :
+ new ResultProperties(delivery, Long.parseLong(param.maxResultReads));
+
String handleUrl = getHandleUrl(param.host, param.path, delivery);
SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
SessionConfig sessionConfig = sessionOutput.config();
@@ -478,7 +487,7 @@
"http://" + hostName + ":" + appCtx.getExternalProperties().getQueryWebInterfacePort());
response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
response.setStatus(execution.getHttpStatus());
- executeStatement(statementsText, sessionOutput, delivery, stats, param, execution, optionalParams);
+ executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution, optionalParams);
if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
}
@@ -502,9 +511,9 @@
}
}
- protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
- IStatementExecutor.Stats stats, RequestParameters param, RequestExecutionState execution,
- Map<String, String> optionalParameters) throws Exception {
+ protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+ ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param,
+ RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception {
IClusterManagementWork.ClusterState clusterState =
((ICcApplicationContext) appCtx).getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -518,8 +527,8 @@
sessionOutput, compilationProvider, componentProvider);
execution.start();
final IRequestParameters requestParameters =
- new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), delivery, stats, null,
- param.clientContextID, optionalParameters);
+ new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), resultProperties, stats,
+ null, param.clientContextID, optionalParameters);
translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
execution.end();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 3359b9f..360c522 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -42,6 +42,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -209,7 +210,8 @@
IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
final IRequestParameters requestParameters =
- new RequestParameters(hds, resultDelivery, new IStatementExecutor.Stats(), null, null, null);
+ new RequestParameters(hds, new ResultProperties(resultDelivery), new IStatementExecutor.Stats(),
+ null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 58a7f09..4ecd978 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -35,6 +35,7 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -120,8 +121,8 @@
IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
final IRequestParameters requestParameters =
- new RequestParameters(null, IStatementExecutor.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats(),
- null, null, null);
+ new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
+ new IStatementExecutor.Stats(), null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
writer.flush();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0b8c34c..5b0eb97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -46,6 +46,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -69,20 +70,20 @@
private final ILangExtension.Language lang;
private final String statementsText;
private final SessionConfig sessionConfig;
- private final IStatementExecutor.ResultDelivery delivery;
+ private final ResultProperties resultProperties;
private final String clientContextID;
private final String handleUrl;
private final Map<String, String> optionalParameters;
public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
- String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery,
+ String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
String clientContextID, String handleUrl, Map<String, String> optionalParameters) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
this.statementsText = statementsText;
this.sessionConfig = sessionConfig;
- this.delivery = delivery;
+ this.resultProperties = resultProperties;
this.clientContextID = clientContextID;
this.handleUrl = handleUrl;
this.optionalParameters = optionalParameters;
@@ -122,7 +123,8 @@
compilationProvider, storageComponentProvider);
final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
final IRequestParameters requestParameters =
- new RequestParameters(null, delivery, stats, outMetadata, clientContextID, optionalParameters);
+ new RequestParameters(null, resultProperties, stats, outMetadata, clientContextID,
+ optionalParameters);
translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters);
outPrinter.close();
responseMsg.setResult(outWriter.toString());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9b96883..4e9cb47 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -283,7 +283,8 @@
Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
Map<String, String> config = new HashMap<>();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
- final ResultDelivery resultDelivery = requestParameters.getResultDelivery();
+ final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
+ final long maxResultReads = requestParameters.getResultProperties().getMaxReads();
final Stats stats = requestParameters.getStats();
final ResultMetadata outMetadata = requestParameters.getOutMetadata();
final String clientContextId = requestParameters.getClientContextId();
@@ -351,6 +352,7 @@
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
|| resultDelivery == ResultDelivery.DEFERRED);
+ metadataProvider.setMaxResultReads(maxResultReads);
}
handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata,
stats, false, clientContextId);
@@ -386,6 +388,7 @@
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
+ metadataProvider.setMaxResultReads(maxResultReads);
handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats,
clientContextId, ctx);
break;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index 5b8da8b..9592492 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -22,24 +22,24 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.hyracks.api.dataset.IHyracksDataset;
public class RequestParameters implements IRequestParameters {
private final IHyracksDataset hdc;
- private final ResultDelivery resultDelivery;
+ private final ResultProperties resultProperties;
private final Stats stats;
private final Map<String, String> optionalParameters;
private final IStatementExecutor.ResultMetadata outMetadata;
private final String clientContextId;
- public RequestParameters(IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ public RequestParameters(IHyracksDataset hdc, ResultProperties resultProperties, Stats stats,
IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
Map<String, String> optionalParameters) {
this.hdc = hdc;
- this.resultDelivery = resultDelivery;
+ this.resultProperties = resultProperties;
this.stats = stats;
this.outMetadata = outMetadata;
this.clientContextId = clientContextId;
@@ -52,8 +52,8 @@
}
@Override
- public IStatementExecutor.ResultDelivery getResultDelivery() {
- return resultDelivery;
+ public ResultProperties getResultProperties() {
+ return resultProperties;
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 30336d1..222e098 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;
+import org.apache.asterix.api.http.server.QueryServiceServlet;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.api.Duration;
import org.apache.asterix.common.config.GlobalConfig;
@@ -122,7 +123,7 @@
private static final Pattern HTTP_PARAM_PATTERN = Pattern.compile("param (\\w+)=(.*)", Pattern.MULTILINE);
private static final Pattern HTTP_BODY_PATTERN = Pattern.compile("body=(.*)", Pattern.MULTILINE);
private static final Pattern HTTP_STATUSCODE_PATTERN = Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
-
+ private static final Pattern MAX_RESULT_READS_PATTERN = Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
public static final int TRUNCATE_THRESHOLD = 16384;
public static final String DELIVERY_ASYNC = "async";
@@ -556,7 +557,12 @@
public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
- final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
+ List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
+ final Optional<String> maxReadsOptional = extractMaxResultReads(str);
+ if (maxReadsOptional.isPresent()) {
+ newParams = upsertParam(newParams, QueryServiceServlet.Parameter.MAX_RESULT_READS.str(),
+ maxReadsOptional.get());
+ }
HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
: constructPostMethodUrl(str, uri, "statement", newParams);
// Set accepted output response type
@@ -1392,6 +1398,14 @@
return tmpStmt;
}
+ protected static Optional<String> extractMaxResultReads(String statement) {
+ final Matcher m = MAX_RESULT_READS_PATTERN.matcher(statement);
+ while (m.find()) {
+ return Optional.of(m.group(1));
+ }
+ return Optional.empty();
+ }
+
protected static Optional<String> extractBody(String statement) {
final Matcher m = HTTP_BODY_PATTERN.matcher(statement);
while (m.find()) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 9a2ad3c..fe030a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -49,4 +49,10 @@
<output-dir compare="Text">async-running</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-exhausted-result">
+ <output-dir compare="Text">async-exhausted-result</output-dir>
+ <expected-error>Job Failed</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp
new file mode 100644
index 0000000..f8ec2cf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- maxresultreads=1
+-- handlevariable=status
+
+select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri
new file mode 100644
index 0000000..bca879b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- polltimeoutsecs=20
+-- handlevariable=result
+
+$status
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri
new file mode 100644
index 0000000..b613531
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+$result
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri
new file mode 100644
index 0000000..b613531
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+$result
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
index 1e18f66..8055915 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -17,6 +17,7 @@
* under the License.
*/
+-- maxresultreads=2
-- handlevariable=status
select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex
new file mode 100644
index 0000000..4308ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex
@@ -0,0 +1,2 @@
+/"status": "success"/
+/"handle": ".*"/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b8790e5..6f58b0a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -153,6 +153,7 @@
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private boolean asyncResults;
+ private long maxResultReads;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
private TxnId txnId;
@@ -238,6 +239,14 @@
this.asyncResults = asyncResults;
}
+ public void setMaxResultReads(long maxResultReads) {
+ this.maxResultReads = maxResultReads;
+ }
+
+ public long getMaxResultReads() {
+ return maxResultReads;
+ }
+
public ResultSetId getResultSetId() {
return resultSetId;
}
@@ -536,7 +545,7 @@
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
.getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
- resultSerializedAppenderFactory);
+ resultSerializedAppenderFactory, getMaxResultReads());
} catch (IOException e) {
throw new AlgebricksException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index 008f0be..e6cf6d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -25,7 +25,7 @@
public interface IDatasetPartitionManager extends IDatasetManager {
IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- boolean asyncMode, int partition, int nPartitions) throws HyracksException;
+ boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException;
void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
boolean orderedResult, boolean emptyResult) throws HyracksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 60e2e35..56c4576 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -52,7 +52,8 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open() {
+ // no op
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index bc980e1..d381a67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -70,12 +70,12 @@
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- boolean asyncMode, int partition, int nPartitions) throws HyracksException {
+ boolean asyncMode, int partition, int nPartitions, long maxReads) {
DatasetPartitionWriter dpw;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
- datasetMemoryManager, fileFactory);
+ datasetMemoryManager, fileFactory, maxReads);
ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
index ec33b05..24edeb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -31,11 +32,8 @@
private static final Logger LOGGER = LogManager.getLogger();
private final DatasetPartitionManager datasetPartitionManager;
-
private final DatasetMemoryManager datasetMemoryManager;
-
private final Executor executor;
-
private final ResultState resultState;
public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
@@ -47,56 +45,67 @@
}
public void writeTo(final IFrameWriter writer) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- NetworkOutputChannel channel = (NetworkOutputChannel) writer;
- channel.setFrameSize(resultState.getFrameSize());
- try {
- resultState.readOpen();
- channel.open();
- try {
- long offset = 0;
- ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
- while (true) {
- buffer.clear();
- long size = read(offset, buffer);
- if (size <= 0) {
- break;
- } else if (size < buffer.limit()) {
- throw new HyracksDataException("Premature end of file - readSize: " + size
- + " buffer limit: " + buffer.limit());
- }
- offset += size;
- buffer.flip();
- channel.nextFrame(buffer);
- }
- LOGGER.info("Result Reader read + " + offset + " bytes");
- } finally {
- channel.close();
- resultState.readClose();
- // If the query is a synchronous query, remove its partition as soon as it is read.
- if (!resultState.getAsyncMode()) {
- datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
- resultState.getResultSetPartitionId().getResultSetId(), resultState
- .getResultSetPartitionId().getPartition());
- }
- }
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
- }
- }
+ executor.execute(new ResultPartitionSender((NetworkOutputChannel) writer));
+ }
- private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- if (datasetMemoryManager == null) {
- return resultState.read(offset, buffer);
- } else {
- return resultState.read(datasetMemoryManager, offset, buffer);
+ private class ResultPartitionSender implements Runnable {
+
+ private final NetworkOutputChannel channel;
+
+ ResultPartitionSender(final NetworkOutputChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void run() {
+ channel.setFrameSize(resultState.getFrameSize());
+ channel.open();
+ try {
+ resultState.readOpen();
+ long offset = 0;
+ final ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
+ while (true) {
+ buffer.clear();
+ final long size = read(offset, buffer);
+ if (size <= 0) {
+ break;
+ } else if (size < buffer.limit()) {
+ throw new IllegalStateException(
+ "Premature end of file - readSize: " + size + " buffer limit: " + buffer.limit());
+ }
+ offset += size;
+ buffer.flip();
+ channel.nextFrame(buffer);
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("result reading successful(" + resultState.getResultSetPartitionId() + ")");
+ }
+ } catch (Exception e) {
+ LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e);
+ channel.abort();
+ } finally {
+ close();
}
- });
+ }
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ return datasetMemoryManager != null ?
+ resultState.read(datasetMemoryManager, offset, buffer) :
+ resultState.read(offset, buffer);
+ }
+
+ private void close() {
+ try {
+ channel.close();
+ resultState.readClose();
+ if (resultState.isExhausted()) {
+ final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId();
+ datasetPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(),
+ partitionId.getPartition());
+ }
+ } catch (HyracksDataException e) {
+ LOGGER.error("unexpected failure in partition reader clean up", e);
+ }
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2bf5326..d49a1a65 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -59,7 +59,7 @@
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
- DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
+ DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
@@ -70,7 +70,7 @@
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory,
- ctx.getInitialFrameSize());
+ ctx.getInitialFrameSize(), maxReads);
}
public ResultState getResultState() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index 43b1e9b..3e3f06b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -68,17 +68,22 @@
private long size;
private long persistentSize;
+ private long remainingReads;
ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager,
- IWorkspaceFileFactory fileFactory, int frameSize) {
+ IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
+ if (maxReads <= 0) {
+ throw new IllegalArgumentException("maxReads must be > 0");
+ }
this.resultSetPartitionId = resultSetPartitionId;
this.asyncMode = asyncMode;
this.ioManager = ioManager;
this.fileFactory = fileFactory;
this.frameSize = frameSize;
+ remainingReads = maxReads;
eos = new AtomicBoolean(false);
failed = new AtomicBoolean(false);
- localPageList = new ArrayList<Page>();
+ localPageList = new ArrayList<>();
fileRef = null;
writeFileHandle = null;
@@ -102,6 +107,7 @@
closeWriteFileHandle();
if (fileRef != null) {
fileRef.delete();
+ fileRef = null;
}
}
@@ -152,7 +158,10 @@
}
public synchronized void readOpen() {
- // It is a noOp for now, leaving here to keep the API stable for future usage.
+ if (isExhausted()) {
+ throw new IllegalStateException("Result reads exhausted");
+ }
+ remainingReads--;
}
public synchronized void readClose() throws HyracksDataException {
@@ -339,6 +348,7 @@
ObjectNode on = om.createObjectNode();
on.put("rspid", resultSetPartitionId.toString());
on.put("async", asyncMode);
+ on.put("remainingReads", remainingReads);
on.put("eos", eos.get());
on.put("failed", failed.get());
on.put("fileRef", String.valueOf(fileRef));
@@ -347,4 +357,8 @@
return e.getMessage();
}
}
+
+ public synchronized boolean isExhausted() {
+ return remainingReads == 0;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 58eee79..d081bdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -51,14 +51,16 @@
private final boolean asyncMode;
private final IResultSerializerFactory resultSerializerFactory;
+ private final long maxReads;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
- boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException {
+ boolean asyncMode, IResultSerializerFactory resultSerializerFactory, long maxReads) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
this.ordered = ordered;
this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
+ this.maxReads = maxReads;
}
@Override
@@ -87,7 +89,7 @@
public void open() throws HyracksDataException {
try {
datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
- nPartitions);
+ nPartitions, maxReads);
datasetPartitionWriter.open();
resultSerializer.init();
} catch (HyracksException e) {
@@ -139,7 +141,8 @@
sb.append("{ ");
sb.append("\"rsId\": \"").append(rsId).append("\", ");
sb.append("\"ordered\": ").append(ordered).append(", ");
- sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+ sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
+ sb.append("\"maxReads\": ").append(maxReads).append(" }");
return sb.toString();
}
};
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f169054..080746c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -100,7 +100,7 @@
ResultSetId rsId = new ResultSetId(1);
AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
return printer;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index b5c6238..c05b504 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -101,7 +101,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
@@ -173,7 +173,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -246,7 +246,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 56bf853..b693b09 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -159,7 +159,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 67642f4..67845c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -229,7 +229,7 @@
ResultSetId rsId = new ResultSetId(1);
AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
return printer;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index b62c011..d7d4219 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -95,7 +95,7 @@
spec.addResultSetId(rsId);
outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index dc91dd2..06d7b04 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -71,7 +71,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
@@ -107,7 +107,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
@@ -146,7 +146,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 3043cba..df9c0d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -85,7 +85,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -135,7 +135,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7075fe9..2c055c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -132,7 +132,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -215,7 +215,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -300,7 +300,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -385,7 +385,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -471,7 +471,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -563,7 +563,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -654,7 +654,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -750,7 +750,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 03cd5d4..dc5d0bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -182,7 +182,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -263,7 +263,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -344,7 +344,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -430,7 +430,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 542f037..e4d6398 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -79,7 +79,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);