Deferred result retrieval for the QueryService
And some cleanup:
- fewer JSONExceptions on interfaces
- rename ResultDelivery options (existing HTTP APIs still work)
SYNC -> IMMEDIATE
ASYNC_DEFERRED -> DEFERRED
ASYNC -> ASYNC
- shorten variables
queryMetadataProvider -> metadataProvider
aqlStatements -> statements
compiled -> jobSpec
- rename ResultUtil.displayResults to printResults
Change-Id: I72d53be824d8dbf1d9f547b01f19097d0dc18add
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1373
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 7783121..a1f3055 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.job.JobSpecification;
-import org.json.JSONException;
/**
* An interface that takes care of executing a list of statements that are submitted through an Asterix API
@@ -39,19 +38,19 @@
/**
* Specifies result delivery of executed statements
*/
- public enum ResultDelivery {
+ enum ResultDelivery {
/**
- * Wait for results to be read
+ * Results are returned with the first response
*/
- SYNC,
+ IMMEDIATE,
/**
- * Flush out result handle beofre waiting for the result
+ * Results are produced completely, but only a result handle is returned
*/
- ASYNC,
+ DEFERRED,
/**
- * Return result handle and don't wait for the result
+ * A result handle is returned before the resutlts are complete
*/
- ASYNC_DEFERRED
+ ASYNC
}
public static class Stats {
@@ -116,17 +115,14 @@
* @param dmlStatement
* The data modification statement when the query results in a modification to a dataset
* @return the compiled {@code JobSpecification}
- * @param returnQuery
- * In the case of dml, the user may run a query on affected data
* @throws AsterixException
* @throws RemoteException
* @throws AlgebricksException
- * @throws JSONException
* @throws ACIDException
*/
JobSpecification rewriteCompileQuery(MetadataProvider metadataProvider, Query query,
ICompiledDmlStatement dmlStatement)
- throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
+ throws AsterixException, RemoteException, AlgebricksException, ACIDException;
/**
* returns the active dataverse for an entity or a statement
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index fc4f655..328f714 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -123,6 +123,8 @@
private final ResultDecorator preResultDecorator;
private final ResultDecorator postResultDecorator;
+ private final ResultDecorator preHandleDecorator;
+ private final ResultDecorator postHandleDecorator;
// Flags.
private final Map<String, Boolean> flags;
@@ -141,17 +143,19 @@
* Output format for execution output.
*/
public SessionConfig(PrintWriter out, OutputFormat fmt) {
- this(out, fmt, null, null, true, true, true);
+ this(out, fmt, null, null, null, null, true, true, true);
}
public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator) {
- this(out, fmt, preResultDecorator, postResultDecorator, true, true, true);
+ ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator,
+ ResultDecorator postHandleDecorator) {
+ this(out, fmt, preResultDecorator, postResultDecorator, preHandleDecorator, postHandleDecorator, true, true,
+ true);
}
public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
boolean generateJobSpec) {
- this(out, fmt, null, null, optimize, executeQuery, generateJobSpec);
+ this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec);
}
/**
@@ -172,11 +176,14 @@
* false, job cannot be executed).
*/
public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
+ ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator,
+ ResultDecorator postHandleDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
this.out = out;
this.fmt = fmt;
this.preResultDecorator = preResultDecorator;
this.postResultDecorator = postResultDecorator;
+ this.preHandleDecorator = preHandleDecorator;
+ this.postHandleDecorator = postHandleDecorator;
this.optimize = optimize;
this.executeQuery = executeQuery;
this.generateJobSpec = generateJobSpec;
@@ -199,12 +206,19 @@
public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
- };
+ }
public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
- };
+ }
+ public AlgebricksAppendable handlePrefix(AlgebricksAppendable app) throws AlgebricksException {
+ return this.preHandleDecorator != null ? this.preHandleDecorator.append(app) : app;
+ }
+
+ public AlgebricksAppendable handlePostfix(AlgebricksAppendable app) throws AlgebricksException {
+ return this.postHandleDecorator != null ? this.postHandleDecorator.append(app) : app;
+ }
/**
* Retrieve the value of the "execute query" flag.
*/
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index e08b3db..0d8df9e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -34,11 +34,11 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
-import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
import org.apache.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory;
import org.apache.asterix.dataflow.data.common.AqlMissableTypeComputer;
import org.apache.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer;
import org.apache.asterix.dataflow.data.common.ConflictingTypeResolver;
+import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
@@ -151,9 +151,9 @@
}
public JobSpecification compileQuery(List<FunctionDecl> declaredFunctions,
- MetadataProvider queryMetadataProvider, Query rwQ, int varCounter, String outputDatasetName,
+ MetadataProvider metadataProvider, Query rwQ, int varCounter, String outputDatasetName,
SessionConfig conf, ICompiledDmlStatement statement)
- throws AlgebricksException, JSONException, RemoteException, ACIDException {
+ throws AlgebricksException, RemoteException, ACIDException {
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
conf.out().println();
@@ -166,9 +166,9 @@
}
org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
- queryMetadataProvider.setJobId(asterixJobId);
+ metadataProvider.setJobId(asterixJobId);
ILangExpressionToPlanTranslator t =
- translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider, varCounter);
+ translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);
ILogicalPlan plan;
// statement = null when it's a query
@@ -211,7 +211,7 @@
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites());
builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites());
- IDataFormat format = queryMetadataProvider.getFormat();
+ IDataFormat format = metadataProvider.getFormat();
ICompilerFactory compilerFactory = builder.create();
builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory());
@@ -219,9 +219,9 @@
builder.setExpressionTypeComputer(ExpressionTypeComputer.INSTANCE);
builder.setMissableTypeComputer(AqlMissableTypeComputer.INSTANCE);
builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE);
- builder.setClusterLocations(queryMetadataProvider.getClusterLocations());
+ builder.setClusterLocations(metadataProvider.getClusterLocations());
- ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
+ ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter());
if (conf.isOptimize()) {
compiler.optimize();
//plot optimized logical plan
@@ -247,7 +247,7 @@
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
- ResultUtil.displayResults(pvisitor.get().toString(), conf, new Stats(), null);
+ ResultUtil.printResults(pvisitor.get().toString(), conf, new Stats(), null);
return null;
} catch (IOException e) {
throw new AlgebricksException(e);
@@ -291,13 +291,17 @@
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
JobEventListenerFactory jobEventListenerFactory =
- new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction());
+ new JobEventListenerFactory(asterixJobId, metadataProvider.isWriteTransaction());
JobSpecification spec = compiler.createJob(AsterixAppContextInfo.INSTANCE, jobEventListenerFactory);
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(conf, "Hyracks job");
if (rwQ != null) {
- conf.out().println(spec.toJSON().toString(1));
+ try {
+ conf.out().println(spec.toJSON().toString(1));
+ } catch (JSONException e) {
+ throw new AlgebricksException(e);
+ }
conf.out().println(spec.getUserConstraints());
}
printPlanPostfix(conf);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
index 4e9bb25..b693407 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
@@ -129,7 +129,7 @@
statementExectorFactory.create(aqlStatements, sessionConfig, compilationProvider);
double duration = 0;
long startTime = System.currentTimeMillis();
- translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.SYNC);
+ translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE);
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java
index a572500..9994bc7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
@@ -31,6 +33,7 @@
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
import org.apache.hyracks.api.client.HyracksConnection;
@@ -45,9 +48,12 @@
public class QueryResultAPIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(QueryResultAPIServlet.class.getName());
+
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
- response.setContentType("text/html");
+ int respCode = HttpServletResponse.SC_OK;
+ response.setContentType("text/html"); // TODO this seems wrong ...
response.setCharacterEncoding("utf-8");
String strHandle = request.getParameter("handle");
PrintWriter out = response.getWriter();
@@ -56,6 +62,10 @@
IHyracksDataset hds;
try {
+ if (strHandle == null || strHandle.isEmpty()) {
+ throw new AsterixException("Empty request, no handle provided");
+ }
+
HyracksProperties hp = new HyracksProperties();
String strIP = hp.getHyracksIPAddress();
int port = hp.getHyracksPort();
@@ -88,11 +98,16 @@
// originally determined there. Need to save this value on
// some object that we can obtain here.
SessionConfig sessionConfig = RESTAPIServlet.initResponse(request, response);
- ResultUtil.displayResults(resultReader, sessionConfig, new Stats(), null);
+ ResultUtil.printResults(resultReader, sessionConfig, new Stats(), null);
} catch (Exception e) {
+ respCode = HttpServletResponse.SC_BAD_REQUEST;
out.println(e.getMessage());
- e.printStackTrace(out);
+ LOGGER.log(Level.WARNING, "Error retrieving result", e);
+ }
+ response.setStatus(respCode);
+ if (out.checkError()) {
+ LOGGER.warning("Error flushing output writer");
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
index 941c7f7..9da518a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
@@ -81,7 +81,8 @@
STATEMENT("statement"),
FORMAT("format"),
CLIENT_ID("client_context_id"),
- PRETTY("pretty");
+ PRETTY("pretty"),
+ MODE("mode");
private final String str;
@@ -132,6 +133,7 @@
TYPE("type"),
STATUS("status"),
RESULTS("results"),
+ HANDLE("handle"),
ERRORS("errors"),
METRICS("metrics");
@@ -229,6 +231,7 @@
String format;
boolean pretty;
String clientContextID;
+ String mode;
@Override
public String toString() {
@@ -242,6 +245,7 @@
sb.append("\", ");
sb.append("\"format\": \"").append(format).append("\", ");
sb.append("\"pretty\": ").append(pretty).append(", ");
+ sb.append("\"mode\": ").append(mode).append(", ");
sb.append("\"clientContextID\": \"").append(clientContextID).append("\" ");
sb.append('}');
return sb;
@@ -279,7 +283,7 @@
return SessionConfig.OutputFormat.ADM;
}
if (format.startsWith(MediaType.JSON.str())) {
- return Boolean.parseBoolean(getParameterValue(format, "lossless"))
+ return Boolean.parseBoolean(getParameterValue(format, Attribute.LOSSLESS.str()))
? SessionConfig.OutputFormat.LOSSLESS_JSON : SessionConfig.OutputFormat.CLEAN_JSON;
}
}
@@ -302,19 +306,22 @@
}
};
- SessionConfig.ResultDecorator resultPostfix = (AlgebricksAppendable app) -> {
- app.append("\t,\n");
- return app;
- };
+ SessionConfig.ResultDecorator resultPostfix = (AlgebricksAppendable app) -> app.append("\t,\n");
+
+ SessionConfig.ResultDecorator handlePrefix = (AlgebricksAppendable app) -> app.append("\t\"").append
+ (ResultFields.HANDLE.str()).append("\": ");
+
+ SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(",\n");
SessionConfig.OutputFormat format = getFormat(param.format);
- SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix);
+ SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix,
+ handlePrefix, handlePostfix);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV
- && "present".equals(getParameterValue(param.format, "header")));
+ && "present".equals(getParameterValue(param.format, Attribute.HEADER.str())));
return sessionConfig;
}
@@ -440,6 +447,7 @@
param.statement = jsonRequest.get(Parameter.STATEMENT.str()).asText();
param.format = toLower(getOptText(jsonRequest, Parameter.FORMAT.str()));
param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false);
+ param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str()));
param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str());
} catch (JsonParseException | JsonMappingException e) {
// if the JSON parsing fails, the statement is empty and we get an empty statement error
@@ -452,6 +460,7 @@
}
param.format = toLower(request.getParameter(Parameter.FORMAT.str()));
param.pretty = Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str()));
+ param.mode = toLower(request.getParameter(Parameter.MODE.str()));
param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str());
}
return param;
@@ -463,12 +472,24 @@
return sw.toString();
}
+ private static QueryTranslator.ResultDelivery parseResultDelivery(String mode) {
+ if ("async".equals(mode)) {
+ return QueryTranslator.ResultDelivery.ASYNC;
+ } else if ("deferred".equals(mode)) {
+ return QueryTranslator.ResultDelivery.DEFERRED;
+ } else {
+ return QueryTranslator.ResultDelivery.IMMEDIATE;
+ }
+ }
+
private void handleRequest(RequestParameters param, HttpServletResponse response) throws IOException {
LOGGER.info(param.toString());
long elapsedStart = System.nanoTime();
final StringWriter stringWriter = new StringWriter();
final PrintWriter resultWriter = new PrintWriter(stringWriter);
+ QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode);
+
SessionConfig sessionConfig = createSessionConfig(param, resultWriter);
response.setCharacterEncoding("utf-8");
response.setContentType(MediaType.JSON.str());
@@ -504,12 +525,12 @@
}
}
IParser parser = compilationProvider.getParserFactory().createParser(param.statement);
- List<Statement> aqlStatements = parser.parse();
+ List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
- IStatementExecutor translator = statementExecutorFactory.create(aqlStatements, sessionConfig,
+ IStatementExecutor translator = statementExecutorFactory.create(statements, sessionConfig,
compilationProvider);
execStart = System.nanoTime();
- translator.compileAndExecute(hcc, hds, QueryTranslator.ResultDelivery.SYNC, stats);
+ translator.compileAndExecute(hcc, hds, delivery, stats);
execEnd = System.nanoTime();
printStatus(resultWriter, ResultStatus.SUCCESS);
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
index 558be35..4a06590 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
@@ -49,6 +49,7 @@
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -108,7 +109,12 @@
format = OutputFormat.LOSSLESS_JSON;
}
- SessionConfig sessionConfig = new SessionConfig(response.getWriter(), format);
+ SessionConfig.ResultDecorator handlePrefix = (AlgebricksAppendable app) -> app.append("{ \"").append("handle")
+ .append("\": ");
+ SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(" }");
+
+ SessionConfig sessionConfig = new SessionConfig(response.getWriter(), format, null, null, handlePrefix,
+ handlePostfix);
// If it's JSON or ADM, check for the "wrapper-array" flag. Default is
// "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -228,13 +234,13 @@
protected QueryTranslator.ResultDelivery whichResultDelivery(HttpServletRequest request) {
String mode = request.getParameter("mode");
if (mode != null) {
- if (mode.equals("asynchronous")) {
+ if ("asynchronous".equals(mode) || "async".equals(mode)) {
return QueryTranslator.ResultDelivery.ASYNC;
- } else if (mode.equals("asynchronous-deferred")) {
- return QueryTranslator.ResultDelivery.ASYNC_DEFERRED;
+ } else if ("asynchronous-deferred".equals(mode) || "deferred".equals(mode)) {
+ return QueryTranslator.ResultDelivery.DEFERRED;
}
}
- return QueryTranslator.ResultDelivery.SYNC;
+ return QueryTranslator.ResultDelivery.IMMEDIATE;
}
protected abstract String getQueryParameter(HttpServletRequest request);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index fd9c6cd..3d240f8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -95,7 +95,7 @@
}
IStatementExecutor translator = statementExecutorFactory.create(statements, conf, compilationProvider);
- translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.SYNC);
+ translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE);
writer.flush();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index 414585a..3bdf353 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -93,7 +93,7 @@
statements.add(subscribeStmt);
IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider);
translator.compileAndExecute(AsterixAppContextInfo.INSTANCE.getHcc(), null,
- QueryTranslator.ResultDelivery.SYNC);
+ QueryTranslator.ResultDelivery.IMMEDIATE);
if (LOGGER.isEnabledFor(Level.INFO)) {
LOGGER.info("Submitted connection requests for execution: " + request);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
new file mode 100644
index 0000000..05eb967
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+public class ResultHandle {
+ private long jobId;
+ private long resultSetId;
+
+ public ResultHandle(JobId jobId, ResultSetId resultSetId) {
+ this.jobId = jobId.getId();
+ this.resultSetId = resultSetId.getId();
+ }
+
+ public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
+ return app.append("[").append(String.valueOf(jobId)).append(", ").append(String.valueOf(resultSetId))
+ .append("]");
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 22d2c23..22034c3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -128,7 +128,7 @@
}
}
- private void displayRecord(String result) {
+ private void printRecord(String result) {
String record = result;
if (indentJSON) {
// TODO(tillw): this is inefficient - do this during record generation
@@ -152,7 +152,7 @@
printPrefix();
// TODO(tillw) evil hack
quoteRecord = true;
- displayRecord(record);
+ printRecord(record);
printPostfix();
}
@@ -179,7 +179,7 @@
conf.out().print(", ");
}
notFirst = true;
- displayRecord(result);
+ printRecord(result);
}
frameBuffer.clear();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
index f68f458..595c6ab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
@@ -39,6 +39,7 @@
import org.apache.asterix.translator.SessionConfig;
import org.apache.http.ParseException;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
import org.json.JSONArray;
@@ -72,16 +73,27 @@
return escaped;
}
- public static void displayResults(ResultReader resultReader, SessionConfig conf, Stats stats,
+ public static void printResults(ResultReader resultReader, SessionConfig conf, Stats stats,
ARecordType recordType) throws HyracksDataException {
new ResultPrinter(conf, stats, recordType).print(resultReader);
}
- public static void displayResults(String record, SessionConfig conf, Stats stats, ARecordType recordType)
+ public static void printResults(String record, SessionConfig conf, Stats stats, ARecordType recordType)
throws HyracksDataException {
new ResultPrinter(conf, stats, recordType).print(record);
}
+ public static void printResultHandle(ResultHandle handle, SessionConfig conf) throws HyracksDataException {
+ try {
+ final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
+ conf.handlePrefix(app);
+ handle.append(app);
+ conf.handlePostfix(app);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary,
String errorStackTrace) {
JSONObject errorResp = new JSONObject();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index cf6e49d..d5d5e53 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -50,6 +50,7 @@
import org.apache.asterix.app.external.ExternalIndexingOperations;
import org.apache.asterix.app.external.FeedJoint;
import org.apache.asterix.app.external.FeedOperations;
+import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.common.config.AsterixExternalProperties;
@@ -202,9 +203,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
import com.google.common.collect.Lists;
@@ -344,7 +342,7 @@
if (((InsertStatement) stmt).getReturnQuery() != null) {
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
- || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+ || resultDelivery == ResultDelivery.DEFERRED);
}
handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
break;
@@ -376,7 +374,7 @@
case Statement.Kind.QUERY:
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
- || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+ || resultDelivery == ResultDelivery.DEFERRED);
handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
break;
case Statement.Kind.COMPACT:
@@ -1881,7 +1879,6 @@
MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
- JobSpecification compiled = null;
try {
metadataProvider.setWriteTransaction(true);
CompiledInsertStatement clfrqs = null;
@@ -1899,19 +1896,19 @@
default:
throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind());
}
- compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
+ JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, query, clfrqs);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- if (compiled != null && !compileOnly) {
+ if (jobSpec != null && !compileOnly) {
if (stmtInsertUpsert.getReturnQuery() != null) {
- handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
+ handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
} else {
- JobUtils.runJob(hcc, compiled, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
}
-
+ return jobSpec;
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
@@ -1922,7 +1919,6 @@
dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
query.getDatasets());
}
- return compiled;
}
public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt,
@@ -1942,13 +1938,13 @@
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getVarCounter(),
stmtDelete.getQuery());
- JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- if (compiled != null) {
- JobUtils.runJob(hcc, compiled, true);
+ if (jobSpec != null) {
+ JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e) {
@@ -1966,7 +1962,7 @@
@Override
public JobSpecification rewriteCompileQuery(MetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt)
- throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+ throws AsterixException, RemoteException, AlgebricksException, ACIDException {
// Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2421,7 +2417,7 @@
metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
StringUtils.join(bfs.getLocations(), ','));
- JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
+ JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(),
bfs.getSubscriptionRequest().getTargetDataset());
String dataverse = feedConnectionId.getFeedId().getDataverse();
@@ -2429,7 +2425,7 @@
MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
dataverse + "." + feedConnectionId.getFeedId().getEntityName());
try {
- JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
+ JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(jobSpec, feedConnectionId,
bfs.getSubscriptionRequest().getPolicyParameters());
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, bfs.getPolicy());
if (policy == null) {
@@ -2441,7 +2437,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- if (compiled != null) {
+ if (jobSpec != null) {
FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
@@ -2563,19 +2559,19 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
- JobSpecification compiled;
try {
- compiled = rewriteCompileQuery(metadataProvider, query, null);
+ JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, query, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (query.isExplain()) {
sessionConfig.out().flush();
- return compiled;
- } else if (sessionConfig.isExecuteQuery() && compiled != null) {
- handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
+ return jobSpec;
+ } else if (sessionConfig.isExecuteQuery() && jobSpec != null) {
+ handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
}
+ return jobSpec;
} catch (Exception e) {
LOGGER.log(Level.INFO, e.getMessage(), e);
if (bActiveTxn) {
@@ -2587,46 +2583,33 @@
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
- return compiled;
}
private void handleQueryResult(MetadataProvider metadataProvider, IHyracksClientConnection hcc,
- IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats)
+ IHyracksDataset hdc, JobSpecification jobSpec, ResultDelivery resultDelivery, Stats stats)
throws Exception {
if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
- GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
+ GlobalConfig.ASTERIX_LOGGER.fine(jobSpec.toJSON().toString(1));
}
- JobId jobId = JobUtils.runJob(hcc, compiled, false);
+ JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
- JSONObject response = new JSONObject();
switch (resultDelivery) {
case ASYNC:
- JSONArray handle = new JSONArray();
- handle.put(jobId.getId());
- handle.put(metadataProvider.getResultSetId().getId());
- response.put("handle", handle);
- sessionConfig.out().print(response);
- sessionConfig.out().flush();
+ ResultUtil.printResultHandle(new ResultHandle(jobId, metadataProvider.getResultSetId()), sessionConfig);
hcc.waitForCompletion(jobId);
break;
- case SYNC:
+ case IMMEDIATE:
hcc.waitForCompletion(jobId);
ResultReader resultReader = new ResultReader(hdc);
resultReader.open(jobId, metadataProvider.getResultSetId());
- ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+ ResultUtil.printResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
break;
- case ASYNC_DEFERRED:
- handle = new JSONArray();
- handle.put(jobId.getId());
- handle.put(metadataProvider.getResultSetId().getId());
- response.put("handle", handle);
+ case DEFERRED:
hcc.waitForCompletion(jobId);
- sessionConfig.out().print(response);
- sessionConfig.out().flush();
+ ResultUtil.printResultHandle(new ResultHandle(jobId, metadataProvider.getResultSetId()), sessionConfig);
break;
default:
break;
-
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java
index d99602f..662888e 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java
@@ -39,7 +39,8 @@
private static final Logger LOGGER = Logger.getLogger(ResultExtractor.class.getName());
public static InputStream extract(InputStream resultStream) throws Exception {
- String result = IOUtils.toString(resultStream, Charset.forName("UTF-8"));
+ final Charset utf8 = Charset.forName("UTF-8");
+ String result = IOUtils.toString(resultStream, utf8);
LOGGER.fine("+++++++\n" + result + "\n+++++++\n");
@@ -78,7 +79,17 @@
if (! "success".equals(status)) {
throw new Exception("Unexpected status: '" + status + "'");
}
- return IOUtils.toInputStream(results);
+ return IOUtils.toInputStream(results, utf8);
+ }
+
+ public static String extractHandle(InputStream resultStream) throws Exception {
+ final Charset utf8 = Charset.forName("UTF-8");
+ String result = IOUtils.toString(resultStream, utf8);
+ JSONObject parsed = new JSONObject(result);
+ JSONArray handle = parsed.getJSONArray("handle");
+ JSONObject res = new JSONObject();
+ res.put("handle", handle);
+ return res.toString();
}
private static String getFieldName(JSONTokener tokener) throws JSONException {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 2da57e3..bfe0e33 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -430,7 +430,7 @@
public InputStream executeQueryService(String str, OutputFormat fmt, String url,
List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
- setFormatParam(params, fmt);
+ setParam(params, "format", fmt.mimeType());
HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params)
: constructPostMethodUrl(str, url, "statement", params);
// Set accepted output response type
@@ -439,16 +439,24 @@
return response.getEntity().getContent();
}
- protected void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) {
+ public InputStream executeQueryService(String statement, OutputFormat fmt, String url,
+ List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception {
+ setParam(params, "mode", deferred);
+ InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded);
+ String handle = ResultExtractor.extractHandle(resultStream);
+ return getHandleResult(handle, fmt);
+ }
+
+ protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) {
for (CompilationUnit.Parameter param : params) {
- if ("format".equals(param.getName())) {
- param.setValue(fmt.mimeType());
+ if (name.equals(param.getName())) {
+ param.setValue(value);
return;
}
}
CompilationUnit.Parameter formatParam = new CompilationUnit.Parameter();
- formatParam.setName("format");
- formatParam.setValue(fmt.mimeType());
+ formatParam.setName(name);
+ formatParam.setValue(value);
params.add(formatParam);
}
@@ -781,14 +789,16 @@
resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Servlets.AQL));
}
} else {
- if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQueryService(statement, fmt, getEndpoint(Servlets.QUERY_SERVICE),
- cUnit.getParameter(), true);
+ final String reqType = ctx.getType();
+ final String url = getEndpoint(Servlets.QUERY_SERVICE);
+ final List<CompilationUnit.Parameter> params = cUnit.getParameter();
+ if (reqType.equalsIgnoreCase("query")) {
+ resultStream = executeQueryService(statement, fmt, url, params, true);
resultStream = ResultExtractor.extract(resultStream);
- } else if (ctx.getType().equalsIgnoreCase("async")) {
- resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Servlets.SQLPP));
- } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
- resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Servlets.SQLPP));
+ } else if (reqType.equalsIgnoreCase("async")) {
+ resultStream = executeQueryService(statement, fmt, url, params, true, "async");
+ } else if (reqType.equalsIgnoreCase("asyncdefer")) {
+ resultStream = executeQueryService(statement, fmt, url, params, true, "deferred");
}
}
if (queryCount.intValue() >= expectedResultFileCtxs.size()) {