Fix async result delivery for compilation errors
- Request submission returns after successful compilation or returns the
compilation error.
Change-Id: Ib594cdceb8ff2801f3e2af37be68c1609bef2a11
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1575
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index 328f714..a637e2f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -110,6 +110,11 @@
AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
}
+ @FunctionalInterface
+ public interface ResultAppender {
+ AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
+ }
+
// Standard execution flags.
private final boolean executeQuery;
private final boolean generateJobSpec;
@@ -123,34 +128,16 @@
private final ResultDecorator preResultDecorator;
private final ResultDecorator postResultDecorator;
- private final ResultDecorator preHandleDecorator;
- private final ResultDecorator postHandleDecorator;
+ private final ResultAppender handleAppender;
+ private final ResultAppender statusAppender;
// Flags.
private final Map<String, Boolean> flags;
- /**
- * Create a SessionConfig object with all default values:
- * - All format flags set to "false".
- * - All out-of-band outputs set to "null".
- * - "Optimize" set to "true".
- * - "Execute Query" set to "true".
- * - "Generate Job Spec" set to "true".
- *
- * @param out
- * PrintWriter for execution output.
- * @param fmt
- * Output format for execution output.
- */
- public SessionConfig(PrintWriter out, OutputFormat fmt) {
- this(out, fmt, null, null, null, null, true, true, true);
- }
-
public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator,
- ResultDecorator postHandleDecorator) {
- this(out, fmt, preResultDecorator, postResultDecorator, preHandleDecorator, postHandleDecorator, true, true,
- true);
+ ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
+ this(out, fmt, preResultDecorator, postResultDecorator, handleAppender, statusAppender,
+ true, true, true);
}
public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
@@ -176,14 +163,14 @@
* false, job cannot be executed).
*/
public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator,
- ResultDecorator postHandleDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
+ ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender,
+ boolean optimize, boolean executeQuery, boolean generateJobSpec) {
this.out = out;
this.fmt = fmt;
this.preResultDecorator = preResultDecorator;
this.postResultDecorator = postResultDecorator;
- this.preHandleDecorator = preHandleDecorator;
- this.postHandleDecorator = postHandleDecorator;
+ this.handleAppender = handleAppender;
+ this.statusAppender = statusAppender;
this.optimize = optimize;
this.executeQuery = executeQuery;
this.generateJobSpec = generateJobSpec;
@@ -212,13 +199,14 @@
return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
}
- public AlgebricksAppendable handlePrefix(AlgebricksAppendable app) throws AlgebricksException {
- return this.preHandleDecorator != null ? this.preHandleDecorator.append(app) : app;
+ public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
+ return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
}
- public AlgebricksAppendable handlePostfix(AlgebricksAppendable app) throws AlgebricksException {
- return this.postHandleDecorator != null ? this.postHandleDecorator.append(app) : app;
+ public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
+ return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
}
+
/**
* Retrieve the value of the "execute query" flag.
*/
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 0759599..aec19e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -31,7 +31,7 @@
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
-import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index 831244c..f79a2da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -21,28 +21,19 @@
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
-import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.utils.JSONUtil;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.http.server.AbstractServlet;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-class AbstractQueryApiServlet extends AbstractServlet {
+public class AbstractQueryApiServlet extends AbstractServlet {
public enum ResultFields {
REQUEST_ID("requestID"),
@@ -129,84 +120,14 @@
}
}
- protected static JsonNode parseHandle(ObjectMapper om, String strHandle, Logger logger) throws IOException {
- if (strHandle == null) {
- logger.log(Level.WARNING, "No handle provided");
- } else {
- try {
- JsonNode handleObj = om.readTree(strHandle);
- return handleObj.get("handle");
- } catch (JsonProcessingException e) { // NOSONAR
- logger.log(Level.WARNING, "Invalid handle: \"" + strHandle + "\"");
- }
- }
- return null;
- }
-
protected static UUID printRequestId(PrintWriter pw) {
UUID requestId = UUID.randomUUID();
- printField(pw, ResultFields.REQUEST_ID.str(), requestId.toString());
+ ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId.toString());
return requestId;
}
- protected static void printStatus(PrintWriter pw, ResultStatus rs) {
- printStatus(pw, rs, true);
- }
-
- protected static void printStatus(PrintWriter pw, ResultStatus rs, boolean comma) {
- printField(pw, ResultFields.STATUS.str(), rs.str(), comma);
- }
-
- protected static void printHandle(PrintWriter pw, String handle) {
- printField(pw, ResultFields.HANDLE.str(), handle, true);
- }
-
protected static void printHandle(PrintWriter pw, String handle, boolean comma) {
- printField(pw, ResultFields.HANDLE.str(), handle, comma);
- }
-
- protected static void printError(PrintWriter pw, Throwable e) throws JsonProcessingException {
- printError(pw, e, true);
- }
-
- protected static void printError(PrintWriter pw, Throwable e, boolean comma) throws JsonProcessingException {
- Throwable rootCause = ResultUtil.getRootCause(e);
- if (rootCause == null) {
- rootCause = e;
- }
- final boolean addStack = false;
- pw.print("\t\"");
- pw.print(ResultFields.ERRORS.str());
- pw.print("\": [{ \n");
- printField(pw, QueryServiceServlet.ErrorField.CODE.str(), "1");
- final String msg = rootCause.getMessage();
- printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil
- .escape(msg != null ? msg : rootCause.getClass().getSimpleName()),
- addStack);
- pw.print(comma ? "\t}],\n" : "\t}]\n");
- }
-
- protected static void printField(PrintWriter pw, String name, String value) {
- printField(pw, name, value, true);
- }
-
- protected static void printField(PrintWriter pw, String name, String value, boolean comma) {
- printFieldInternal(pw, name, "\"" + value + "\"", comma);
- }
-
- protected static void printField(PrintWriter pw, String name, long value, boolean comma) {
- printFieldInternal(pw, name, String.valueOf(value), comma);
- }
-
- protected static void printFieldInternal(PrintWriter pw, String name, String value, boolean comma) {
- pw.print("\t\"");
- pw.print(name);
- pw.print("\": ");
- pw.print(value);
- if (comma) {
- pw.print(',');
- }
- pw.print('\n');
+ ResultUtil.printField(pw, ResultFields.HANDLE.str(), handle, comma);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index d91d5fc..e435da7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -36,7 +36,6 @@
import javax.imageio.ImageIO;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.AsterixException;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index cd632cab..ebd1bdb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -48,7 +48,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index bfc67cf..57492be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,22 +25,16 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import io.netty.handler.codec.http.HttpResponseStatus;
public class QueryResultApiServlet extends AbstractQueryApiServlet {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 67aa914..b0a9586 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -255,13 +255,14 @@
};
SessionConfig.ResultDecorator resultPostfix = app -> app.append("\t,\n");
- SessionConfig.ResultDecorator handlePrefix =
- app -> app.append("\t\"").append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl);
- SessionConfig.ResultDecorator handlePostfix = app -> app.append("\",\n");
+ SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("\t\"")
+ .append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl).append(handle).append("\",\n");
+ SessionConfig.ResultAppender appendStatus = (app, status) -> app.append("\t\"")
+ .append(ResultFields.STATUS.str()).append("\": \"").append(status).append("\",\n");
SessionConfig.OutputFormat format = getFormat(param.format);
SessionConfig sessionConfig =
- new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, handlePrefix, handlePostfix);
+ new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, appendHandle, appendStatus);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
@@ -273,23 +274,23 @@
private static void printClientContextID(PrintWriter pw, RequestParameters params) {
if (params.clientContextID != null && !params.clientContextID.isEmpty()) {
- printField(pw, ResultFields.CLIENT_ID.str(), params.clientContextID);
+ ResultUtil.printField(pw, ResultFields.CLIENT_ID.str(), params.clientContextID);
}
}
private static void printSignature(PrintWriter pw) {
- printField(pw, ResultFields.SIGNATURE.str(), "*");
+ ResultUtil.printField(pw, ResultFields.SIGNATURE.str(), "*");
}
private static void printType(PrintWriter pw, SessionConfig sessionConfig) {
switch (sessionConfig.fmt()) {
case ADM:
- printField(pw, ResultFields.TYPE.str(), HttpUtil.ContentType.APPLICATION_ADM);
+ ResultUtil.printField(pw, ResultFields.TYPE.str(), HttpUtil.ContentType.APPLICATION_ADM);
break;
case CSV:
String contentType = HttpUtil.ContentType.CSV + "; header="
+ (sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER) ? "present" : "absent");
- printField(pw, ResultFields.TYPE.str(), contentType);
+ ResultUtil.printField(pw, ResultFields.TYPE.str(), contentType);
break;
default:
break;
@@ -302,13 +303,13 @@
pw.print(ResultFields.METRICS.str());
pw.print("\": {\n");
pw.print("\t");
- printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime));
+ ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime));
pw.print("\t");
- printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime));
+ ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime));
pw.print("\t");
- printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
+ ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
pw.print("\t");
- printField(pw, Metrics.RESULT_SIZE.str(), resultSize, false);
+ ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, false);
pw.print("\t}\n");
}
@@ -434,16 +435,18 @@
translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats,
param.clientContextID, queryCtx);
execEnd = System.nanoTime();
- printStatus(resultWriter, ResultDelivery.ASYNC == delivery ? ResultStatus.RUNNING : ResultStatus.SUCCESS);
+ if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
+ ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+ }
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
- printError(resultWriter, pe);
- printStatus(resultWriter, ResultStatus.FATAL);
+ ResultUtil.printError(resultWriter, pe);
+ ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
status = HttpResponseStatus.BAD_REQUEST;
} catch (Exception e) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
- printError(resultWriter, e);
- printStatus(resultWriter, ResultStatus.FATAL);
+ ResultUtil.printError(resultWriter, e);
+ ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
} finally {
if (execStart == -1) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index 061ccc3..8fbb4c5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -73,14 +73,14 @@
HttpResponseStatus httpStatus = HttpResponseStatus.OK;
resultWriter.print("{\n");
- printStatus(resultWriter, resultStatus, (ex != null) || ResultStatus.SUCCESS == resultStatus);
+ ResultUtil.printStatus(resultWriter, resultStatus, (ex != null) || ResultStatus.SUCCESS == resultStatus);
if (ResultStatus.SUCCESS == resultStatus) {
String servletPath = servletPath(request).replace("status", "result");
String resHandle = "http://" + host(request) + servletPath + strHandle;
printHandle(resultWriter, resHandle, false);
} else if (ex != null) {
- printError(resultWriter, ex, false);
+ ResultUtil.printError(resultWriter, ex, false);
}
resultWriter.print("}\n");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 7a587ef..74290f3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -29,7 +29,6 @@
import java.util.logging.Logger;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -45,7 +44,6 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -113,11 +111,9 @@
format = OutputFormat.LOSSLESS_JSON;
}
- SessionConfig.ResultDecorator handlePrefix =
- (AlgebricksAppendable app) -> app.append("{ \"").append("handle").append("\": \"");
- SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append("\" }");
- SessionConfig sessionConfig =
- new SessionConfig(response.writer(), format, null, null, handlePrefix, handlePostfix);
+ SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
+ .append("\":" + " \"").append(handle).append("\" }");
+ SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, appendHandle, null);
// If it's JSON or ADM, check for the "wrapper-array" flag. Default is
// "true" for JSON and "false" for ADM. (Not applicable for CSV.)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
similarity index 79%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index b730989..18bd7b6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.app.result;
+package org.apache.asterix.api.http.server;
import java.io.BufferedReader;
import java.io.IOException;
@@ -33,6 +33,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.asterix.app.result.ResultHandle;
+import org.apache.asterix.app.result.ResultPrinter;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.utils.JSONUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
@@ -54,7 +58,6 @@
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
private ResultUtil() {
-
}
/**
@@ -83,17 +86,72 @@
new ResultPrinter(conf, stats, recordType).print(record);
}
- public static void printResultHandle(ResultHandle handle, SessionConfig conf) throws HyracksDataException {
+ public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException {
try {
final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
- conf.handlePrefix(app);
- handle.append(app);
- conf.handlePostfix(app);
+ conf.appendHandle(app, handle.toString());
} catch (AlgebricksException e) {
- throw new HyracksDataException(e);
+ LOGGER.warn("error printing handle", e);
}
}
+ public static void printStatus(SessionConfig conf, AbstractQueryApiServlet.ResultStatus rs) {
+ try {
+ final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
+ conf.appendStatus(app, rs.str());
+ } catch (AlgebricksException e) {
+ LOGGER.warn("error printing status", e);
+ }
+ }
+
+ public static void printStatus(PrintWriter pw, AbstractQueryApiServlet.ResultStatus rs, boolean comma) {
+ printField(pw, AbstractQueryApiServlet.ResultFields.STATUS.str(), rs.str(), comma);
+ }
+
+ public static void printError(PrintWriter pw, Throwable e) {
+ printError(pw, e, true);
+ }
+
+ public static void printError(PrintWriter pw, Throwable e, boolean comma) {
+ Throwable rootCause = getRootCause(e);
+ if (rootCause == null) {
+ rootCause = e;
+ }
+ final boolean addStack = false;
+ pw.print("\t\"");
+ pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
+ pw.print("\": [{ \n");
+ printField(pw, QueryServiceServlet.ErrorField.CODE.str(), "1");
+ final String msg = rootCause.getMessage();
+ printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil
+ .escape(msg != null ? msg : rootCause.getClass().getSimpleName()),
+ addStack);
+ pw.print(comma ? "\t}],\n" : "\t}]\n");
+ }
+
+ public static void printField(PrintWriter pw, String name, String value) {
+ printField(pw, name, value, true);
+ }
+
+ public static void printField(PrintWriter pw, String name, String value, boolean comma) {
+ printFieldInternal(pw, name, "\"" + value + "\"", comma);
+ }
+
+ public static void printField(PrintWriter pw, String name, long value, boolean comma) {
+ printFieldInternal(pw, name, String.valueOf(value), comma);
+ }
+
+ protected static void printFieldInternal(PrintWriter pw, String name, String value, boolean comma) {
+ pw.print("\t\"");
+ pw.print(name);
+ pw.print("\": ");
+ pw.print(value);
+ if (comma) {
+ pw.print(',');
+ }
+ pw.print('\n');
+ }
+
public static ObjectNode getErrorResponse(int errorCode, String errorMessage, String errorSummary,
String errorStackTrace) {
ObjectMapper om = new ObjectMapper();
@@ -259,4 +317,5 @@
}
return errorTemplate;
}
+
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index 9050907..3493870 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -39,7 +39,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
public class ShutdownApiServlet extends AbstractServlet {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
index 7809a84..bbb9b99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.app.result;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
@@ -65,10 +63,5 @@
@Override
public String toString() {
return Long.toString(jobId.getId()) + "-" + Long.toString(resultSetId.getId());
-
- }
-
- public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
- return app.append(toString());
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 03bef13..e64cf14 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -45,10 +45,11 @@
import org.apache.asterix.active.IActiveEventSubscriber;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.common.APIFramework;
+import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -2400,31 +2401,13 @@
private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats,
- String clientContextId, IStatementExecutorContext ctx)
- throws Exception {
+ String clientContextId, IStatementExecutorContext ctx) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
- Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
- executorService.submit(() -> {
- try {
- createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
- final ResultHandle handle = new ResultHandle(id, resultSetId);
- ResultUtil.printResultHandle(handle, sessionConfig);
- synchronized (printed) {
- printed.setTrue();
- printed.notify();
- }
- }, clientContextId, ctx);
- } catch (Exception e) {
- synchronized (jobId) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
- resultDelivery.name() + " job " + "with id " + jobId.getValue() + " " + "failed",
- e);
- }
- }
- });
+ executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
+ clientContextId, ctx, resultSetId, printed));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -2440,7 +2423,7 @@
break;
case DEFERRED:
createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
- ResultUtil.printResultHandle(new ResultHandle(id, resultSetId), sessionConfig);
+ ResultUtil.printResultHandle(sessionConfig, new ResultHandle(id, resultSetId));
}, clientContextId, ctx);
break;
default:
@@ -2448,6 +2431,39 @@
}
}
+ private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
+ ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx,
+ ResultSetId resultSetId, MutableBoolean printed) {
+ Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
+ try {
+ createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+ final ResultHandle handle = new ResultHandle(id, resultSetId);
+ ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.RUNNING);
+ ResultUtil.printResultHandle(sessionConfig, handle);
+ synchronized (printed) {
+ printed.setTrue();
+ printed.notify();
+ }
+ }, clientContextId, ctx);
+ } catch (Exception e) {
+ if (JobId.INVALID.equals(jobId.getValue())) {
+ // compilation failed
+ ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.FAILED);
+ ResultUtil.printError(sessionConfig.out(), e);
+ } else {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
+ resultDelivery.name() + " job with id " + jobId.getValue() + " " + "failed", e);
+ }
+ } finally {
+ synchronized (printed) {
+ if (printed.isFalse()) {
+ printed.setTrue();
+ printed.notify();
+ }
+ }
+ }
+ }
+
private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
IStatementExecutorContext ctx) throws Exception {
@@ -2462,9 +2478,7 @@
ctx.put(clientContextId, jobId); // Adds the running job into the context.
}
if (jId != null) {
- synchronized (jId) {
- jId.setValue(jobId);
- }
+ jId.setValue(jobId);
}
if (ResultDelivery.ASYNC == resultDelivery) {
printer.print(jobId);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp
new file mode 100644
index 0000000..2b91cb6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#handlevariable=status
+
+select count(*)
+from gargel
+where id = 1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-compilation-failed/async-compilation-failed.1.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-compilation-failed/async-compilation-failed.1.regex
new file mode 100644
index 0000000..66de2a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-compilation-failed/async-compilation-failed.1.regex
@@ -0,0 +1,2 @@
+/"status": "failed"/
+/"errors": ".*"/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d26e0d5..bcb95aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -29,6 +29,12 @@
</compilation-unit>
</test-case>
<test-case FilePath="async-deferred">
+ <compilation-unit name="async-compilation-failed">
+ <output-dir compare="Text">async-compilation-failed</output-dir>
+ <expected-error>Cannot find dataset gargel</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
<compilation-unit name="deferred">
<output-dir compare="Text">deferred</output-dir>
</compilation-unit>