[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup
- user model changes: no
- storage format changes: no
- interface changes: yes
--IJobManager.cancel now takes a callback
details:
- Job cancellation now completes only after the job cleanup work
has completed and not merely when the abort tasks are executed.
- The NCQueryServiceServlet actively cancels requests that passes
5 minutes.
- Cancellation of timedout jobs is not done through the Http API
but through message broker.
- Typically, requests might timeout when the servers are
overloaded. When that is the case, there is a high chance http
requests are to be rejected including requests to cancel
previously submitted queries. This is the reason for using
Message broker for this task.
- ExecuteStatementRequest used to execute the statement in
a different executor thread even though it is itself is being
executed in an executor thread and is not blocking anyone.
This was fixed as well.
Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1961
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a4e72f7..c38b3fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.io.PrintWriter;
import java.util.UUID;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..58c282f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.awt.image.BufferedImage;
import java.io.BufferedReader;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index b24a9a1..4faab1e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 52d4d67..6dea30c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index d9a63f7..52af643 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index dcd0e70..9d2415d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
@@ -34,7 +34,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
new file mode 100644
index 0000000..bdda750
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public enum Duration {
+ SEC("s", 9),
+ MILLI("ms", 6),
+ MICRO("µs", 3),
+ NANO("ns", 0);
+
+ static final long NANOSECONDS = 1;
+ static final long MICROSECONDS = 1000 * NANOSECONDS;
+ static final long MILLISECONDS = 1000 * MICROSECONDS;
+ static final long SECONDS = 1000 * MILLISECONDS;
+ static final long MINUTES = 60 * SECONDS;
+ static final long HOURS = 60 * MINUTES;
+
+ String unit;
+ int nanoDigits;
+
+ Duration(String unit, int nanoDigits) {
+ this.unit = unit;
+ this.nanoDigits = nanoDigits;
+ }
+
+ public static String formatNanos(long nanoTime) {
+ final String strTime = String.valueOf(nanoTime);
+ final int len = strTime.length();
+ for (Duration tu : Duration.values()) {
+ if (len > tu.nanoDigits) {
+ final String integer = strTime.substring(0, len - tu.nanoDigits);
+ final String fractional = strTime.substring(len - tu.nanoDigits);
+ return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit;
+ }
+ }
+ return "illegal string value: " + strTime;
+ }
+
+ // ParseDuration parses a duration string.
+ // A duration string is a possibly signed sequence of
+ // decimal numbers, each with optional fraction and a unit suffix,
+ // such as "300ms", "-1.5h" or "2h45m".
+ // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+ // returns the duration in nano seconds
+ public static long parseDurationStringToNanos(String orig) throws HyracksDataException {
+ // [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
+ String s = orig;
+ long d = 0;
+ boolean neg = false;
+ char c;
+ // Consume [-+]?
+ if (!s.isEmpty()) {
+ c = s.charAt(0);
+ if (c == '-' || c == '+') {
+ neg = c == '-';
+ s = s.substring(1);
+ }
+ }
+
+ // Special case: if all that is left is "0", this is zero.
+ if ("0".equals(s)) {
+ return 0L;
+ }
+
+ if (s.isEmpty()) {
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+
+ while (!s.isEmpty()) {
+ long v = 0L; // integers before decimal
+ long f = 0L; // integers after decimal
+ double scale = 1.0; // value = v + f/scale
+ // The next character must be [0-9.]
+ if (!(s.charAt(0) == '.' || '0' <= s.charAt(0) && s.charAt(0) <= '9')) {
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+ // Consume [0-9]*
+ int pl = s.length();
+ Pair<Long, String> pair = leadingInt(s);
+ v = pair.getLeft();
+ s = pair.getRight();
+ boolean pre = pl != s.length(); // whether we consumed anything before a period
+
+ // Consume (\.[0-9]*)?
+ boolean post = false;
+ if (!s.isEmpty() && s.charAt(0) == '.') {
+ s = s.substring(1);
+ pl = s.length();
+ Triple<Long, Double, String> triple = leadingFraction(s);
+ f = triple.getLeft();
+ scale = triple.getMiddle();
+ s = triple.getRight();
+ post = pl != s.length();
+ }
+ if (!pre && !post) {
+ // no digits (e.g. ".s" or "-.s")
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+
+ // Consume unit.
+ int i = 0;
+ for (; i < s.length(); i++) {
+ c = s.charAt(i);
+ if (c == '.' || '0' <= c && c <= '9') {
+ break;
+ }
+ }
+ if (i == 0) {
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+ String u = s.substring(0, i);
+ s = s.substring(i);
+ long unit = getUnit(u);
+ if (v > Long.MAX_VALUE / unit) {
+ // overflow
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+ v *= unit;
+ if (f > 0) {
+ // float64 is needed to be nanosecond accurate for fractions of hours.
+ // v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit)
+ v += (long) (((double) f * (double) unit) / scale);
+ if (v < 0) {
+ // overflow
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+ }
+ d += v;
+ if (d < 0) {
+ // overflow
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+ }
+ }
+
+ if (neg) {
+ d = -d;
+ }
+ return d;
+ }
+
+ private static final long getUnit(String unit) throws HyracksDataException {
+ switch (unit) {
+ case "ns":
+ return NANOSECONDS;
+ case "us":
+ case "µs":// U+00B5 = micro symbol
+ case "μs":// U+03BC = Greek letter mu
+ return MICROSECONDS;
+ case "ms":
+ return MILLISECONDS;
+ case "s":
+ return SECONDS;
+ case "m":
+ return MINUTES;
+ case "h":
+ return HOURS;
+ default:
+ throw new RuntimeDataException(ErrorCode.UNKNOWN_DURATION_UNIT, unit);
+ }
+ }
+
+ // leadingInt consumes the leading [0-9]* from s.
+ static Pair<Long, String> leadingInt(String origin) throws HyracksDataException {
+ String s = origin;
+ long x = 0L;
+ int i = 0;
+ for (; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c < '0' || c > '9') {
+ break;
+ }
+ if (x > Long.MAX_VALUE / 10) {
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin);
+ }
+ x = x * 10 + Character.getNumericValue(c);
+ if (x < 0) {
+ throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin);
+ }
+ }
+ return Pair.of(x, s.substring(i));
+ }
+
+ // leadingFraction consumes the leading [0-9]* from s.
+ // It is used only for fractions, so does not return an error on overflow,
+ // it just stops accumulating precision.
+ static Triple<Long, Double, String> leadingFraction(String s) {
+ int i = 0;
+ long x = 0L;
+ double scale = 1.0;
+ boolean overflow = false;
+ for (; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c < '0' || c > '9') {
+ break;
+ }
+ if (overflow) {
+ continue;
+ }
+ if (x > (1 << 63 - 1) / 10) {
+ // It's possible for overflow to give a positive number, so take care.
+ overflow = true;
+ continue;
+ }
+ long y = x * 10 + Character.getNumericValue(c);
+ if (y < 0) {
+ overflow = true;
+ continue;
+ }
+ x = y;
+ scale *= 10;
+ }
+ return Triple.of(x, scale, s.substring(i));
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 9547514..ef49c35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -19,17 +19,20 @@
package org.apache.asterix.api.http.server;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
-import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.CancelQueryRequest;
import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.om.types.ARecordType;
@@ -41,11 +44,14 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.ipc.exceptions.IPCException;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
/**
* Query service servlet that can run on NC nodes.
* Delegates query execution to CC, then serves the result.
*/
public class NCQueryServiceServlet extends QueryServiceServlet {
+
public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
ILangExtension.Language queryLanguage) {
super(ctx, paths, appCtx, queryLanguage, null, null, null);
@@ -63,13 +69,28 @@
ExecuteStatementResponseMessage responseMsg;
MessageFuture responseFuture = ncMb.registerMessageFuture();
try {
+ if (param.clientContextID == null) {
+ param.clientContextID = UUID.randomUUID().toString();
+ }
+ long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+ if (param.timeout != null) {
+ timeout = java.util.concurrent.TimeUnit.NANOSECONDS
+ .toMillis(Duration.parseDurationStringToNanos(param.timeout));
+ }
ExecuteStatementRequestMessage requestMsg =
new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl);
outExecStartEnd[0] = System.nanoTime();
ncMb.sendMessageToCC(requestMsg);
- responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
- ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS);
+ try {
+ responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ } catch (TimeoutException exception) {
+ RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception);
+ // cancel query
+ cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, hde);
+ throw hde;
+ }
outExecStartEnd[1] = System.nanoTime();
} finally {
ncMb.deregisterMessageFuture(responseFuture.getFutureId());
@@ -97,6 +118,22 @@
}
}
+ private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID,
+ Exception exception) {
+ MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture();
+ try {
+ CancelQueryRequest cancelQueryMessage =
+ new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
+ messageBroker.sendMessageToCC(cancelQueryMessage);
+ cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ exception.addSuppressed(e);
+ } finally {
+ messageBroker.deregisterMessageFuture(cancelQueryFuture.getFutureId());
+ }
+ }
+
@Override
protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
if (t instanceof IPCException || t instanceof TimeoutException) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index d9757c7..6291869 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
index bfec146..3f07151 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
@@ -23,7 +23,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1f1d282..c630636 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,7 +27,6 @@
import java.util.logging.Logger;
import org.apache.asterix.algebra.base.ILangExtension;
-import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.config.GlobalConfig;
@@ -108,7 +107,8 @@
FORMAT("format"),
CLIENT_ID("client_context_id"),
PRETTY("pretty"),
- MODE("mode");
+ MODE("mode"),
+ TIMEOUT("timeout");
private final String str;
@@ -154,39 +154,12 @@
}
}
- public enum TimeUnit {
- SEC("s", 9),
- MILLI("ms", 6),
- MICRO("µs", 3),
- NANO("ns", 0);
-
- String unit;
- int nanoDigits;
-
- TimeUnit(String unit, int nanoDigits) {
- this.unit = unit;
- this.nanoDigits = nanoDigits;
- }
-
- public static String formatNanos(long nanoTime) {
- final String strTime = String.valueOf(nanoTime);
- final int len = strTime.length();
- for (TimeUnit tu : TimeUnit.values()) {
- if (len > tu.nanoDigits) {
- final String integer = strTime.substring(0, len - tu.nanoDigits);
- final String fractional = strTime.substring(len - tu.nanoDigits);
- return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit;
- }
- }
- return "illegal string value: " + strTime;
- }
- }
-
static class RequestParameters {
String host;
String path;
String statement;
String format;
+ String timeout;
boolean pretty;
String clientContextID;
String mode;
@@ -202,6 +175,7 @@
on.put("pretty", pretty);
on.put("mode", mode);
on.put("clientContextID", clientContextID);
+ on.put("format", format);
return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on);
} catch (JsonProcessingException e) { // NOSONAR
return e.getMessage();
@@ -297,9 +271,9 @@
pw.print(ResultFields.METRICS.str());
pw.print("\": {\n");
pw.print("\t");
- ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime));
+ ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(elapsedTime));
pw.print("\t");
- ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime));
+ ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), Duration.formatNanos(executionTime));
pw.print("\t");
ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
pw.print("\t");
@@ -334,6 +308,7 @@
param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false);
param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str()));
param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str());
+ param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str());
} catch (JsonParseException | JsonMappingException e) {
// if the JSON parsing fails, the statement is empty and we get an empty statement error
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 27e2806..8536571 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.PrintWriter;
import java.util.ArrayDeque;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 18aae8e..1a7918c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.io.IOException;
import java.util.List;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
similarity index 96%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
index b815d76..2fe37c3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.api.http.servlet;
+package org.apache.asterix.api.http.server;
public class ServletConstants {
public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index fdd106d..06e2383 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
index 5acba381..eeef8e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index a16f678..3b4b974 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -34,14 +34,11 @@
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
-import org.apache.asterix.active.IRetryPolicy;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -53,8 +50,6 @@
import org.apache.asterix.metadata.api.IActiveEntityController;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -94,8 +89,7 @@
protected boolean isFetchingStats;
protected int numRegistered;
protected int numDeRegistered;
- protected volatile Future<Void> recoveryTask;
- protected volatile boolean cancelRecovery;
+ protected volatile RecoveryTask rt;
protected volatile boolean suspended = false;
// failures
protected Exception jobFailure;
@@ -199,7 +193,8 @@
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
setState(ActivityState.TEMPORARILY_FAILED);
- if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING) {
+ if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING
+ && prevState != ActivityState.RESUMING) {
recover();
}
} else {
@@ -356,114 +351,18 @@
@Override
public synchronized void recover() throws HyracksDataException {
LOGGER.log(level, "Recover is called on " + entityId);
- if (recoveryTask != null) {
- LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception");
- throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
- }
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
setState(ActivityState.PERMANENTLY_FAILED);
} else {
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
- IRetryPolicy policy = retryPolicyFactory.create(this);
- cancelRecovery = false;
setState(ActivityState.TEMPORARILY_FAILED);
LOGGER.log(level, "Recovery task has been submitted");
- recoveryTask = executor.submit(() -> {
- String nameBefore = Thread.currentThread().getName();
- try {
- Thread.currentThread().setName("RecoveryTask (" + entityId + ")");
- doRecover(policy);
- } finally {
- Thread.currentThread().setName(nameBefore);
- }
- return null;
- });
+ rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+ executor.submit(rt.recover());
}
}
- protected Void doRecover(IRetryPolicy policy)
- throws AlgebricksException, HyracksDataException, InterruptedException {
- LOGGER.log(level, "Actual Recovery task has started");
- if (getState() != ActivityState.TEMPORARILY_FAILED) {
- LOGGER.log(level, "but its state is not temp failure and so we're just returning");
- return null;
- }
- LOGGER.log(level, "calling the policy");
- while (policy.retry()) {
- synchronized (this) {
- if (cancelRecovery) {
- recoveryTask = null;
- notifyAll();
- return null;
- }
- while (clusterStateManager.getState() != ClusterState.ACTIVE) {
- if (cancelRecovery) {
- recoveryTask = null;
- notifyAll();
- return null;
- }
- wait();
- }
- }
- waitForNonTransitionState();
- IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- entityId.getDataverse() + '.' + entityId.getEntityName());
- for (Dataset dataset : getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
- dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
- }
- synchronized (this) {
- try {
- if (cancelRecovery) {
- recoveryTask = null;
- notifyAll();
- return null;
- }
- setState(ActivityState.RECOVERING);
- doStart(metadataProvider);
- recoveryTask = null;
- notifyAll();
- return null;
- } catch (Exception e) {
- LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
- setState(ActivityState.TEMPORARILY_FAILED);
- recoverFailure = e;
- } finally {
- metadataProvider.getLocks().reset();
- }
- notifyAll();
- }
- }
- // Recovery task is essntially over now either through failure or through cancellation(stop)
- synchronized (this) {
- recoveryTask = null;
- notifyAll();
- if (state != ActivityState.TEMPORARILY_FAILED) {
- return null;
- }
- }
- IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
- try {
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- entityId.getDataverse() + '.' + entityId.getEntityName());
- for (Dataset dataset : getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
- synchronized (this) {
- if (state == ActivityState.TEMPORARILY_FAILED) {
- setState(ActivityState.PERMANENTLY_FAILED);
- }
- notifyAll();
- }
- } finally {
- metadataProvider.getLocks().reset();
- }
- return null;
- }
-
@Override
public synchronized void start(MetadataProvider metadataProvider)
throws HyracksDataException, InterruptedException {
@@ -503,13 +402,10 @@
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
}
if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
- if (recoveryTask != null) {
+ if (rt != null) {
setState(ActivityState.STOPPING);
- cancelRecovery = true;
- recoveryTask.cancel(true);
- while (recoveryTask != null) {
- wait();
- }
+ rt.cancel();
+ rt = null;
}
setState(ActivityState.STOPPED);
try {
@@ -533,6 +429,10 @@
}
}
+ public RecoveryTask getRecoveryTask() {
+ return rt;
+ }
+
@Override
public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
WaitForStateSubscriber subscriber;
@@ -602,8 +502,9 @@
setState(ActivityState.RESUMING);
WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED));
- recoveryTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
- .getExecutor().submit(() -> resumeOrRecover(metadataProvider));
+ rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+ metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor()
+ .submit(() -> rt.resumeOrRecover(metadataProvider));
try {
subscriber.sync();
} catch (Exception e) {
@@ -616,28 +517,6 @@
}
}
- protected Void resumeOrRecover(MetadataProvider metadataProvider)
- throws HyracksDataException, AlgebricksException, InterruptedException {
- try {
- doResume(metadataProvider);
- synchronized (this) {
- setState(ActivityState.RUNNING);
- recoveryTask = null;
- }
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + " Failed", e);
- setState(ActivityState.TEMPORARILY_FAILED);
- if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- setState(ActivityState.PERMANENTLY_FAILED);
- } else {
- IRetryPolicy policy = retryPolicyFactory.create(this);
- cancelRecovery = false;
- doRecover(policy);
- }
- }
- return null;
- }
-
@Override
public boolean isActive() {
return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED;
@@ -652,15 +531,6 @@
this.locations = locations;
}
- public Future<Void> getRecoveryTask() {
- return recoveryTask;
- }
-
- public synchronized void cancelRecovery() {
- cancelRecovery = true;
- notifyAll();
- }
-
@Override
public Exception getJobFailure() {
return jobFailure;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
new file mode 100644
index 0000000..7b7de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IRetryPolicy;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecoveryTask {
+
+ private static final Logger LOGGER = Logger.getLogger(RecoveryTask.class.getName());
+ private static final Level level = Level.INFO;
+ private final ActiveEntityEventsListener listener;
+ private volatile boolean cancelRecovery = false;
+ private final IRetryPolicyFactory retryPolicyFactory;
+ private final MetadataProvider metadataProvider;
+ private final IClusterStateManager clusterStateManager;
+ private Exception failure;
+
+ public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
+ IRetryPolicyFactory retryPolicyFactory) {
+ this.listener = listener;
+ this.retryPolicyFactory = retryPolicyFactory;
+ this.metadataProvider = new MetadataProvider(appCtx, null);
+ this.clusterStateManager = appCtx.getClusterStateManager();
+ }
+
+ public Callable<Void> recover() {
+ if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+ return () -> {
+ return null;
+ };
+ }
+ IRetryPolicy policy = retryPolicyFactory.create(listener);
+ return () -> {
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")");
+ doRecover(policy);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ return null;
+ };
+ }
+
+ public void cancel() {
+ cancelRecovery = true;
+ }
+
+ protected Void resumeOrRecover(MetadataProvider metadataProvider)
+ throws HyracksDataException, AlgebricksException, InterruptedException {
+ try {
+ synchronized (listener) {
+ listener.doResume(metadataProvider);
+ listener.setState(ActivityState.RUNNING);
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "First attempt to resume " + listener.getEntityId() + " Failed", e);
+ synchronized (listener) {
+ if (listener.getState() == ActivityState.RESUMING) {
+ // This will be the case if compilation failure
+ // If the failure is a runtime failure, then the state
+ // would've been set to temporarily failed already
+ listener.setState(ActivityState.TEMPORARILY_FAILED);
+ }
+ }
+ if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+ synchronized (listener) {
+ if (!cancelRecovery) {
+ listener.setState(ActivityState.PERMANENTLY_FAILED);
+ }
+ }
+ } else {
+ IRetryPolicy policy = retryPolicyFactory.create(listener);
+ doRecover(policy);
+ }
+ }
+ return null;
+ }
+
+ protected Void doRecover(IRetryPolicy policy)
+ throws AlgebricksException, HyracksDataException, InterruptedException {
+ LOGGER.log(level, "Actual Recovery task has started");
+ if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+ LOGGER.log(level, "but its state is not temp failure and so we're just returning");
+ return null;
+ }
+ LOGGER.log(level, "calling the policy");
+ while (policy.retry()) {
+ synchronized (listener) {
+ if (cancelRecovery) {
+ return null;
+ }
+ while (clusterStateManager.getState() != ClusterState.ACTIVE) {
+ if (cancelRecovery) {
+ return null;
+ }
+ wait();
+ }
+ }
+ IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
+ dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
+ }
+ synchronized (listener) {
+ try {
+ if (cancelRecovery) {
+ return null;
+ }
+ listener.setState(ActivityState.RECOVERING);
+ listener.doStart(metadataProvider);
+ return null;
+ } catch (Exception e) {
+ LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+ listener.setState(ActivityState.TEMPORARILY_FAILED);
+ failure = e;
+ } finally {
+ metadataProvider.getLocks().reset();
+ }
+ listener.notifyAll();
+ }
+ }
+ // Recovery task is essntially over now either through failure or through cancellation(stop)
+ synchronized (listener) {
+ listener.notifyAll();
+ if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+ return null;
+ }
+ }
+ IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ try {
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
+ DatasetUtil.getFullyQualifiedName(dataset));
+ }
+ synchronized (listener) {
+ if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ listener.setState(ActivityState.PERMANENTLY_FAILED);
+ }
+ listener.notifyAll();
+ }
+ } finally {
+ metadataProvider.getLocks().reset();
+ }
+ return null;
+ }
+
+ public Exception getFailure() {
+ return failure;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
new file mode 100644
index 0000000..fb6ec37
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class CancelQueryRequest implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(CancelQueryRequest.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final long reqId;
+ private final String contextId;
+
+ public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ this.contextId = contextId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+ CCApplication application = (CCApplication) ccs.getApplication();
+ IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
+ JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+
+ if (jobId == null) {
+ LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+ } else {
+ try {
+ IHyracksClientConnection hcc = application.getHcc();
+ hcc.cancelJob(jobId);
+ executorsCtx.removeJobIdFromClientContextId(contextId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
+ }
+ }
+ CancelQueryResponse response = new CancelQueryResponse(reqId);
+ CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
new file mode 100644
index 0000000..4fbcf22
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CancelQueryResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+
+ public CancelQueryResponse(long reqId) {
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..5cee3d9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,6 +22,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -54,23 +55,18 @@
public final class ExecuteStatementRequestMessage implements ICcAddressedMessage {
private static final long serialVersionUID = 1L;
-
private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
-
+ //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062
+ public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
+ //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
+ public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private final String requestNodeId;
-
private final long requestMessageId;
-
private final ILangExtension.Language lang;
-
private final String statementsText;
-
private final SessionConfig sessionConfig;
-
private final IStatementExecutor.ResultDelivery delivery;
-
private final String clientContextID;
-
private final String handleUrl;
public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
@@ -102,47 +98,41 @@
IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
-
- ccSrv.getExecutor().submit(() -> {
- ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
- try {
- IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
- List<Statement> statements = parser.parse();
- StringWriter outWriter = new StringWriter(256);
- PrintWriter outPrinter = new PrintWriter(outWriter);
- SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
- SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
- SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
- SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
- SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
- appendHandle, appendStatus);
-
- IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
-
- MetadataManager.INSTANCE.init();
- IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
- compilationProvider, storageComponentProvider);
- translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata,
- new IStatementExecutor.Stats(), clientContextID, statementExecutorContext);
-
- outPrinter.close();
- responseMsg.setResult(outWriter.toString());
- responseMsg.setMetadata(outMetadata);
- } catch (AlgebricksException | HyracksException | TokenMgrError
- | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
- // we trust that "our" exceptions are serializable and have a comprehensible error message
- GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
- responseMsg.setError(pe);
- } catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
- responseMsg.setError(new Exception(e.toString()));
- }
- try {
- messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, e.toString(), e);
- }
- });
+ ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+ try {
+ IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+ StringWriter outWriter = new StringWriter(256);
+ PrintWriter outPrinter = new PrintWriter(outWriter);
+ SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
+ appendHandle, appendStatus);
+ IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+ compilationProvider, storageComponentProvider);
+ translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, new IStatementExecutor.Stats(),
+ clientContextID, statementExecutorContext);
+ outPrinter.close();
+ responseMsg.setResult(outWriter.toString());
+ responseMsg.setMetadata(outMetadata);
+ } catch (AlgebricksException | HyracksException | TokenMgrError
+ | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ // we trust that "our" exceptions are serializable and have a comprehensible error message
+ GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
+ responseMsg.setError(pe);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
+ responseMsg.setError(new Exception(e.toString()));
+ }
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e.toString(), e);
+ }
}
private String getRejectionReason(ClusterControllerService ccSrv) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 4f9aa0c..54f0a4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -19,8 +19,6 @@
package org.apache.asterix.app.message;
-import java.util.concurrent.TimeUnit;
-
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -31,8 +29,6 @@
public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
private static final long serialVersionUID = 1L;
- public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
-
private final long requestMessageId;
private String result;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..9040ad1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -21,8 +21,8 @@
import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.util.Arrays;
import java.util.List;
@@ -47,10 +47,10 @@
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
import org.apache.asterix.api.http.server.RebalanceApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.api.http.server.ShutdownApiServlet;
import org.apache.asterix.api.http.server.UpdateApiServlet;
import org.apache.asterix.api.http.server.VersionApiServlet;
-import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.cc.ResourceIdManager;
@@ -106,6 +106,7 @@
protected WebManager webManager;
protected CcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
+ private IHyracksClientConnection hcc;
@Override
public void start(IServiceContext serviceCtx, String[] args) throws Exception {
@@ -124,6 +125,9 @@
ccServiceCtx.setThreadFactory(
new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
+ String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+ int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+ hcc = new HyracksConnection(strIP, port);
ILibraryManager libraryManager = new ExternalLibraryManager();
ResourceIdManager resourceIdManager = new ResourceIdManager();
IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -196,7 +200,6 @@
protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception {
HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
externalProperties.getWebInterfacePort());
- IHyracksClientConnection hcc = getHcc();
webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx,
ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP),
@@ -207,7 +210,6 @@
protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception {
HttpServer jsonAPIServer =
new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort());
- IHyracksClientConnection hcc = getHcc();
jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
@@ -252,7 +254,6 @@
protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception {
HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
externalProperties.getQueryWebInterfacePort());
- IHyracksClientConnection hcc = getHcc();
queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
queryWebServer.addServlet(new QueryWebInterfaceServlet(appCtx, queryWebServer.ctx(), new String[] { "/*" }));
return queryWebServer;
@@ -357,9 +358,7 @@
return appCtx;
}
- protected IHyracksClientConnection getHcc() throws Exception {
- String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
- int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
- return new HyracksConnection(strIP, port);
+ public IHyracksClientConnection getHcc() {
+ return hcc;
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 5abbe40..cd58d8f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.api.http.server.ConnectorApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 5f40a85..3cb46fe 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -29,6 +29,7 @@
import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
deleted file mode 100644
index e0539ac..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.api.http.servlet;
-
-import org.apache.asterix.api.http.server.QueryServiceServlet;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class QueryServiceServletTest {
-
- @Test
- public void testTimeUnitFormatNanos() throws Exception {
- Assert.assertEquals("123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(123456789012l));
- Assert.assertEquals("12.345678901s", QueryServiceServlet.TimeUnit.formatNanos(12345678901l));
- Assert.assertEquals("1.234567890s", QueryServiceServlet.TimeUnit.formatNanos(1234567890l));
- Assert.assertEquals("123.456789ms", QueryServiceServlet.TimeUnit.formatNanos(123456789l));
- Assert.assertEquals("12.345678ms", QueryServiceServlet.TimeUnit.formatNanos(12345678l));
- Assert.assertEquals("1.234567ms", QueryServiceServlet.TimeUnit.formatNanos(1234567l));
- Assert.assertEquals("123.456µs", QueryServiceServlet.TimeUnit.formatNanos(123456l));
- Assert.assertEquals("12.345µs", QueryServiceServlet.TimeUnit.formatNanos(12345l));
- Assert.assertEquals("1.234µs", QueryServiceServlet.TimeUnit.formatNanos(1234l));
- Assert.assertEquals("123ns", QueryServiceServlet.TimeUnit.formatNanos(123l));
- Assert.assertEquals("12ns", QueryServiceServlet.TimeUnit.formatNanos(12l));
- Assert.assertEquals("1ns", QueryServiceServlet.TimeUnit.formatNanos(1l));
- Assert.assertEquals("-123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(-123456789012l));
- Assert.assertEquals("120.000000000s", QueryServiceServlet.TimeUnit.formatNanos(120000000000l));
- Assert.assertEquals("-12ns", QueryServiceServlet.TimeUnit.formatNanos(-12l));
- }
-}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
index f994e98..e583c75 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
@@ -19,8 +19,8 @@
package org.apache.asterix.api.http.servlet;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
new file mode 100644
index 0000000..12c61d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import org.apache.asterix.api.http.server.Duration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParseDurationTest {
+
+ @Test
+ public void test() throws Exception {
+ // simple
+ Assert.assertEquals(0, Duration.parseDurationStringToNanos("0"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+ Duration.parseDurationStringToNanos("5s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(30),
+ Duration.parseDurationStringToNanos("30s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1478),
+ Duration.parseDurationStringToNanos("1478s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(-5),
+ Duration.parseDurationStringToNanos("-5s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+ Duration.parseDurationStringToNanos("+5s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+ Duration.parseDurationStringToNanos("-0"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+ Duration.parseDurationStringToNanos("+0"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+ Duration.parseDurationStringToNanos("5.0s"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.SECONDS.toNanos(5)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(600),
+ Duration.parseDurationStringToNanos("5.6s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+ Duration.parseDurationStringToNanos("5.s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500),
+ Duration.parseDurationStringToNanos(".5s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+ Duration.parseDurationStringToNanos("1.0s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+ Duration.parseDurationStringToNanos("1.00s"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+ Duration.parseDurationStringToNanos("1.004s"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+ Duration.parseDurationStringToNanos("1.0040s"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.SECONDS.toNanos(100)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(1),
+ Duration.parseDurationStringToNanos("100.00100s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(10),
+ Duration.parseDurationStringToNanos("10ns"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(11),
+ Duration.parseDurationStringToNanos("11us"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+ Duration.parseDurationStringToNanos("12µs"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+ Duration.parseDurationStringToNanos("12μs"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(13),
+ Duration.parseDurationStringToNanos("13ms"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(14),
+ Duration.parseDurationStringToNanos("14s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.MINUTES.toNanos(15),
+ Duration.parseDurationStringToNanos("15m"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.HOURS.toNanos(16),
+ Duration.parseDurationStringToNanos("16h"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.HOURS.toNanos(3) + java.util.concurrent.TimeUnit.MINUTES.toNanos(30),
+ Duration.parseDurationStringToNanos("3h30m"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.SECONDS.toNanos(10)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500)
+ + java.util.concurrent.TimeUnit.MINUTES.toNanos(4),
+ Duration.parseDurationStringToNanos("10.5s4m"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.MINUTES.toNanos(-2) + java.util.concurrent.TimeUnit.SECONDS.toNanos(-3)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(-400),
+ Duration.parseDurationStringToNanos("-2m3.4s"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.HOURS.toNanos(1) + java.util.concurrent.TimeUnit.MINUTES.toNanos(2)
+ + java.util.concurrent.TimeUnit.SECONDS.toNanos(3)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4)
+ + java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(5)
+ + java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(6),
+ Duration.parseDurationStringToNanos("1h2m3s4ms5us6ns"));
+ Assert.assertEquals(
+ java.util.concurrent.TimeUnit.HOURS.toNanos(39) + java.util.concurrent.TimeUnit.MINUTES.toNanos(9)
+ + java.util.concurrent.TimeUnit.SECONDS.toNanos(14)
+ + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(425),
+ Duration.parseDurationStringToNanos("39h9m14.425s"));
+ Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(52763797000L),
+ Duration.parseDurationStringToNanos("52763797000ns"));
+ Assert.assertEquals(1199999998800L, Duration.parseDurationStringToNanos("0.3333333333333333333333h"));
+ Assert.assertEquals(9007199254740993L, Duration.parseDurationStringToNanos("9007199254740993ns"));
+ Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775807ns"));
+ Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775.807us"));
+ Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036s854ms775us807ns"));
+ Assert.assertEquals(-9223372036854775807L, Duration.parseDurationStringToNanos("-9223372036854775807ns"));
+ assertFail("");
+ assertFail("3");
+ assertFail("-");
+ assertFail("s");
+ assertFail(".");
+ assertFail("-.");
+ assertFail(".s");
+ assertFail("+.s");
+ assertFail("3000000h");
+ assertFail("9223372036854775808ns");
+ assertFail("9223372036854775.808us");
+ assertFail("9223372036854ms775us808ns");
+ assertFail("-9223372036854775808ns");
+ }
+
+ @Test
+ public void testDurationFormatNanos() throws Exception {
+ Assert.assertEquals("123.456789012s", Duration.formatNanos(123456789012l));
+ Assert.assertEquals("12.345678901s", Duration.formatNanos(12345678901l));
+ Assert.assertEquals("1.234567890s", Duration.formatNanos(1234567890l));
+ Assert.assertEquals("123.456789ms", Duration.formatNanos(123456789l));
+ Assert.assertEquals("12.345678ms", Duration.formatNanos(12345678l));
+ Assert.assertEquals("1.234567ms", Duration.formatNanos(1234567l));
+ Assert.assertEquals("123.456µs", Duration.formatNanos(123456l));
+ Assert.assertEquals("12.345µs", Duration.formatNanos(12345l));
+ Assert.assertEquals("1.234µs", Duration.formatNanos(1234l));
+ Assert.assertEquals("123ns", Duration.formatNanos(123l));
+ Assert.assertEquals("12ns", Duration.formatNanos(12l));
+ Assert.assertEquals("1ns", Duration.formatNanos(1l));
+ Assert.assertEquals("-123.456789012s", Duration.formatNanos(-123456789012l));
+ Assert.assertEquals("120.000000000s", Duration.formatNanos(120000000000l));
+ Assert.assertEquals("-12ns", Duration.formatNanos(-12l));
+ }
+
+ private void assertFail(String duration) {
+ try {
+ Duration.parseDurationStringToNanos(duration);
+ Assert.fail("Expected parseDuration(" + duration + ") to fail but it didn't");
+ } catch (HyracksDataException hde) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 143f7d1..fcfb428 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -68,6 +68,9 @@
public static final int POLYGON_3_POINTS = 25;
public static final int POLYGON_INVALID = 26;
public static final int OPERATION_NOT_SUPPORTED = 27;
+ public static final int INVALID_DURATION = 28;
+ public static final int UNKNOWN_DURATION_UNIT = 29;
+ public static final int QUERY_TIMEOUT = 30;
public static final int INSTANTIATION_ERROR = 100;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 715c27d..5bd5482 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -61,6 +61,9 @@
25 = Polygon must have at least 3 points
26 = %1$s can not be an instance of polygon
27 = Operation not supported
+28 = Invalid duration %1$s
+29 = Unknown duration unit %1$s
+30 = Query timed out
100 = Unable to instantiate class %1$s
diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml
index 5497cf4..458bb67 100644
--- a/asterixdb/asterix-metadata/pom.xml
+++ b/asterixdb/asterix-metadata/pom.xml
@@ -16,7 +16,8 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-asterixdb</artifactId>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e2868ae..95479c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -130,6 +130,9 @@
public CancelJobFunction(JobId jobId) {
this.jobId = jobId;
+ if (jobId == null) {
+ throw new IllegalArgumentException("jobId");
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index dbbaf9f..a3078b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -65,6 +65,8 @@
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
public class JobExecutor {
private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
@@ -114,9 +116,10 @@
ccs.getContext().notifyJobStart(jobRun.getJobId());
}
- public void cancelJob() throws HyracksException {
+ public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
// If the job is already terminated or failed, do nothing here.
if (jobRun.getPendingStatus() != null) {
+ callback.setValue(null);
return;
}
// Sets the cancelled flag.
@@ -124,7 +127,8 @@
// Aborts on-ongoing task clusters.
abortOngoingTaskClusters(ta -> false, ta -> null);
// Aborts the whole job.
- abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())));
+ abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())),
+ callback);
}
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -196,8 +200,8 @@
"Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
- ccs.getWorkQueue()
- .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null));
+ ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED,
+ null, NoOpCallback.INSTANCE));
return;
}
startRunnableTaskClusters(taskClusterRoots);
@@ -520,14 +524,14 @@
}
}
- public void abortJob(List<Exception> exceptions) {
+ public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback) {
Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters);
for (TaskCluster tc : inProgressTaskClustersCopy) {
abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
assert inProgressTaskClusters.isEmpty();
- ccs.getWorkQueue()
- .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions));
+ ccs.getWorkQueue().schedule(
+ new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions, callback));
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
@@ -686,7 +690,7 @@
+ " as failed and the number of max re-attempts = " + maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId());
- abortJob(exceptions);
+ abortJob(exceptions, NoOpCallback.INSTANCE);
return;
}
LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId());
@@ -696,7 +700,7 @@
"Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
}
} catch (Exception e) {
- abortJob(Collections.singletonList(e));
+ abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
}
}
@@ -720,7 +724,7 @@
ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
startRunnableActivityClusters();
} catch (Exception e) {
- abortJob(Collections.singletonList(e));
+ abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8fe542f..a9ddee3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.IResultCallback;
/**
* This interface abstracts the job lifecycle management and job scheduling for a cluster.
@@ -47,10 +48,12 @@
/**
* Cancel a job with a given job id.
*
+ * @param callback
+ *
* @param jobId,
* the id of the job.
*/
- void cancel(JobId jobId) throws HyracksException;
+ void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException;
/**
* This method is called when the master process decides to complete job.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index abf1d57..4ba847d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,6 +46,8 @@
import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -115,17 +117,14 @@
}
@Override
- public void cancel(JobId jobId) throws HyracksException {
- if (jobId == null) {
- return;
- }
+ public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException {
// Cancels a running job.
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then consequently
// trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
- jobRun.getExecutor().cancelJob();
+ jobRun.getExecutor().cancelJob(callback);
return;
}
// Removes a pending job.
@@ -138,6 +137,7 @@
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
}
+ callback.setValue(null);
}
@Override
@@ -322,7 +322,7 @@
// fail the job then abort it
run.setStatus(JobStatus.FAILURE, exceptions);
// abort job will trigger JobCleanupWork
- run.getExecutor().abortJob(exceptions);
+ run.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index f3b67c9..e3135df 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -42,10 +42,7 @@
@Override
protected void doRun() throws Exception {
try {
- if (jobId != null) {
- jobManager.cancel(jobId);
- }
- callback.setValue(null);
+ jobManager.cancel(jobId, callback);
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 502ac50..bb85c13 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.IResultCallback;
public class JobCleanupWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName());
@@ -37,12 +38,15 @@
private JobId jobId;
private JobStatus status;
private List<Exception> exceptions;
+ private IResultCallback<Void> callback;
- public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) {
+ public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions,
+ IResultCallback<Void> callback) {
this.jobManager = jobManager;
this.jobId = jobId;
this.status = status;
this.exceptions = exceptions;
+ this.callback = callback;
}
@Override
@@ -53,6 +57,7 @@
try {
JobRun jobRun = jobManager.get(jobId);
jobManager.prepareComplete(jobRun, status, exceptions);
+ callback.setValue(null);
} catch (HyracksException e) {
// Fail the job with the caught exception during final completion.
JobRun run = jobManager.get(jobId);
@@ -62,6 +67,7 @@
}
completionException.add(0, e);
run.setStatus(JobStatus.FAILURE, completionException);
+ callback.setException(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index ed2a740..0d46d64 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.logs.LogFile;
+import org.apache.hyracks.control.common.work.NoOpCallback;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -207,7 +208,7 @@
}
@Test
- public void testCancel() throws HyracksException {
+ public void testCancel() throws Exception {
CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -247,12 +248,12 @@
// Cancels deferred jobs.
for (JobRun run : deferredRuns) {
- jobManager.cancel(run.getJobId());
+ jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
}
// Cancels runnable jobs.
for (JobRun run : acceptedRuns) {
- jobManager.cancel(run.getJobId());
+ jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
}
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
index 150e0e8..ca0c7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.common.work;
public class FutureValue<T> implements IResultCallback<T> {
+
private boolean done;
private T value;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
new file mode 100644
index 0000000..041cee0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.work;
+
+public class NoOpCallback implements IResultCallback<Void> {
+
+ public static final NoOpCallback INSTANCE = new NoOpCallback();
+
+ private NoOpCallback() {
+ }
+
+ @Override
+ public void setValue(Void result) {
+ // Dummy is used when no callback is provided
+ }
+
+ @Override
+ public void setException(Exception e) {
+ // Dummy is used when no callback is provided
+ }
+
+}