Enable HTTP API processing on NCs
- Query/Status/Result are answered by NC nodes
- other HTTP requests are proxied to the CC node
- SessionConfig refactoring – split into config and output (SessionOutput)
- TestExecutor now can send http requests do multiple nodes (round robin)
Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1709
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 92c487b..19f0dcc 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,17 +18,24 @@
*/
package org.apache.asterix.translator;
+import java.io.Serializable;
import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
/**
@@ -54,6 +61,16 @@
ASYNC
}
+ class ResultMetadata implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets = new ArrayList<>();
+
+ public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
+ return resultSets;
+ }
+ }
+
public static class Stats {
private long count;
private long size;
@@ -85,12 +102,14 @@
* A Hyracks dataset client object that is used to read the results.
* @param resultDelivery
* The {@code ResultDelivery} kind required for queries in the list of statements
+ * @param outMetadata
+ * a reference to write the metadata of executed queries
* @param stats
* a reference to write the stats of executed queries
* @throws Exception
*/
void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats) throws Exception;
+ ResultMetadata outMetadata, Stats stats) throws Exception;
/**
* Compiles and execute a list of statements, with passing in client context id and context.
@@ -101,6 +120,8 @@
* A Hyracks dataset client object that is used to read the results.
* @param resultDelivery
* The {@code ResultDelivery} kind required for queries in the list of statements
+ * @param outMetadata
+ * a reference to write the metadata of executed queries
* @param stats
* a reference to write the stats of executed queries
* @param clientContextId
@@ -110,7 +131,8 @@
* @throws Exception
*/
void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception;
+ ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+ throws Exception;
/**
* rewrites and compiles query into a hyracks job specifications
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
index 23365de..b244c0c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
@@ -37,14 +37,14 @@
*
* @param statements
* Statements to execute
- * @param conf
- * request configuration
+ * @param output
+ * output and request configuration
* @param compilationProvider
* provides query language related components
* @param storageComponentProvider
* provides storage related components
* @return an implementation of {@code IStatementExecutor} thaxt is used to execute the passed list of statements
*/
- IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+ IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index a637e2f..cfd4e87 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -18,13 +18,10 @@
*/
package org.apache.asterix.translator;
-import java.io.PrintWriter;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
-
/**
* SessionConfig captures several different parameters for controlling
* the execution of an APIFramework call.
@@ -38,8 +35,10 @@
* execution output - LOSSLESS_JSON, CSV, etc.
* <li>It allows you to specify output format-specific parameters.
*/
+public class SessionConfig implements Serializable {
-public class SessionConfig {
+ private static final long serialVersionUID = 1L;
+
/**
* Used to specify the output format for the primary execution.
*/
@@ -105,53 +104,25 @@
*/
public static final String FORMAT_QUOTE_RECORD = "quote-record";
- @FunctionalInterface
- public interface ResultDecorator {
- AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
- }
-
- @FunctionalInterface
- public interface ResultAppender {
- AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
- }
+ // Output format.
+ private final OutputFormat fmt;
// Standard execution flags.
private final boolean executeQuery;
private final boolean generateJobSpec;
private final boolean optimize;
- // Output path for primary execution.
- private final PrintWriter out;
-
- // Output format.
- private final OutputFormat fmt;
-
- private final ResultDecorator preResultDecorator;
- private final ResultDecorator postResultDecorator;
- private final ResultAppender handleAppender;
- private final ResultAppender statusAppender;
-
// Flags.
private final Map<String, Boolean> flags;
- public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
- this(out, fmt, preResultDecorator, postResultDecorator, handleAppender, statusAppender,
- true, true, true);
- }
-
- public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
- boolean generateJobSpec) {
- this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec);
+ public SessionConfig(OutputFormat fmt) {
+ this(fmt, true, true, true);
}
/**
* Create a SessionConfig object with all optional values set to defaults:
* - All format flags set to "false".
* - All out-of-band outputs set to "false".
- *
- * @param out
- * PrintWriter for execution output.
* @param fmt
* Output format for execution output.
* @param optimize
@@ -160,17 +131,9 @@
* Whether to execute the query or not.
* @param generateJobSpec
* Whether to generate the Hyracks job specification (if
- * false, job cannot be executed).
*/
- public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
- ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender,
- boolean optimize, boolean executeQuery, boolean generateJobSpec) {
- this.out = out;
+ public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
this.fmt = fmt;
- this.preResultDecorator = preResultDecorator;
- this.postResultDecorator = postResultDecorator;
- this.handleAppender = handleAppender;
- this.statusAppender = statusAppender;
this.optimize = optimize;
this.executeQuery = executeQuery;
this.generateJobSpec = generateJobSpec;
@@ -178,35 +141,12 @@
}
/**
- * Retrieve the PrintWriter to produce output to.
- */
- public PrintWriter out() {
- return this.out;
- }
-
- /**
* Retrieve the OutputFormat for this execution.
*/
public OutputFormat fmt() {
return this.fmt;
}
- public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
- return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
- }
-
- public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
- return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
- }
-
- public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
- return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
- }
-
- public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
- return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
- }
-
/**
* Retrieve the value of the "execute query" flag.
*/
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
new file mode 100644
index 0000000..b559df8
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import java.io.PrintWriter;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+public class SessionOutput {
+ private final SessionConfig config;
+
+ // Output path for primary execution.
+ private final PrintWriter out;
+
+ private final SessionOutput.ResultDecorator preResultDecorator;
+ private final SessionOutput.ResultDecorator postResultDecorator;
+ private final SessionOutput.ResultAppender handleAppender;
+ private final SessionOutput.ResultAppender statusAppender;
+
+ public SessionOutput(SessionConfig config, PrintWriter out) {
+ this(config, out, null, null, null, null);
+ }
+
+ public SessionOutput(SessionConfig config, PrintWriter out, ResultDecorator preResultDecorator,
+ ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
+ this.config = config;
+ this.out = out;
+ this.preResultDecorator = preResultDecorator;
+ this.postResultDecorator = postResultDecorator;
+ this.handleAppender = handleAppender;
+ this.statusAppender = statusAppender;
+ }
+
+ /**
+ * Retrieve the PrintWriter to produce output to.
+ */
+ public PrintWriter out() {
+ return this.out;
+ }
+
+ public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
+ return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
+ }
+
+ public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
+ return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
+ }
+
+ public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
+ return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
+ }
+
+ public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
+ return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
+ }
+
+ public SessionConfig config() {
+ return config;
+ }
+
+ @FunctionalInterface
+ public interface ResultDecorator {
+ AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
+ }
+
+ @FunctionalInterface
+ public interface ResultAppender {
+ AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b12935d..583302b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -67,6 +67,7 @@
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.utils.ResourceUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -152,31 +153,33 @@
}
}
- private void printPlanPrefix(SessionConfig conf, String planName) {
- if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("<h4>" + planName + ":</h4>");
- conf.out().println("<pre>");
+ private void printPlanPrefix(SessionOutput output, String planName) {
+ if (output.config().is(SessionConfig.FORMAT_HTML)) {
+ output.out().println("<h4>" + planName + ":</h4>");
+ output.out().println("<pre>");
} else {
- conf.out().println("----------" + planName + ":");
+ output.out().println("----------" + planName + ":");
}
}
- private void printPlanPostfix(SessionConfig conf) {
- if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("</pre>");
+ private void printPlanPostfix(SessionOutput output) {
+ if (output.config().is(SessionConfig.FORMAT_HTML)) {
+ output.out().println("</pre>");
}
}
public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions,
- MetadataProvider metadataProvider, IReturningStatement q, SessionConfig conf) throws CompilationException {
+ MetadataProvider metadataProvider, IReturningStatement q, SessionOutput output)
+ throws CompilationException {
if (q == null) {
return null;
}
+ SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) {
- conf.out().println();
- printPlanPrefix(conf, "Expression tree");
- q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
- printPlanPostfix(conf);
+ output.out().println();
+ printPlanPrefix(output, "Expression tree");
+ q.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+ printPlanPostfix(output);
}
IQueryRewriter rw = rewriterFactory.createQueryRewriter();
rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter()));
@@ -184,17 +187,18 @@
}
public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
- Query rwQ, int varCounter, String outputDatasetName, SessionConfig conf, ICompiledDmlStatement statement)
+ Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
throws AlgebricksException, RemoteException, ACIDException {
+ SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
- conf.out().println();
+ output.out().println();
- printPlanPrefix(conf, "Rewritten expression tree");
+ printPlanPrefix(output, "Rewritten expression tree");
if (rwQ != null) {
- rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
+ rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
@@ -211,14 +215,14 @@
}
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
- conf.out().println();
+ output.out().println();
- printPlanPrefix(conf, "Logical plan");
+ printPlanPrefix(output, "Logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
- LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(output.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
@@ -264,15 +268,16 @@
if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
// For Optimizer tests.
- AlgebricksAppendable buffer = new AlgebricksAppendable(conf.out());
+ AlgebricksAppendable buffer = new AlgebricksAppendable(output.out());
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
- printPlanPrefix(conf, "Optimized logical plan");
+ printPlanPrefix(output, "Optimized logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
- LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+ LogicalOperatorPrettyPrintVisitor pvisitor =
+ new LogicalOperatorPrettyPrintVisitor(output.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
}
}
@@ -280,7 +285,7 @@
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
- ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), conf,
+ ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), output,
new Stats(), null);
return null;
} catch (IOException e) {
@@ -336,17 +341,17 @@
}
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
- printPlanPrefix(conf, "Hyracks job");
+ printPlanPrefix(output, "Hyracks job");
if (rwQ != null) {
try {
- conf.out().println(
+ output.out().println(
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
} catch (IOException e) {
throw new AlgebricksException(e);
}
- conf.out().println(spec.getUserConstraints());
+ output.out().println(spec.getUserConstraints());
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
return spec;
}
@@ -459,9 +464,7 @@
// Gets the frame limit.
private static int getFrameLimit(String parameterName, String parameter, long memBudgetInConfiguration,
- int frameSize,
- int minFrameLimit)
- throws AlgebricksException {
+ int frameSize, int minFrameLimit) throws AlgebricksException {
IOptionType<Long> longBytePropertyInterpreter = OptionTypes.LONG_BYTE_UNIT;
long memBudget = parameter == null ? memBudgetInConfiguration : longBytePropertyInterpreter.parse(parameter);
int frameLimit = (int) (memBudget / frameSize);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a9c4b39..a4e72f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -26,7 +26,7 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -35,7 +35,7 @@
import org.apache.hyracks.http.server.AbstractServlet;
public class AbstractQueryApiServlet extends AbstractServlet {
- protected final ICcApplicationContext appCtx;
+ protected final IApplicationContext appCtx;
public enum ResultFields {
REQUEST_ID("requestID"),
@@ -93,7 +93,7 @@
}
}
- AbstractQueryApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
+ AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
this.appCtx = appCtx;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 3384332..7874aa3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -50,6 +50,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -137,19 +138,20 @@
}
IParser parser = parserFactory.createParser(query);
List<Statement> aqlStatements = parser.parse();
- SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+ SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true);
sessionConfig.set(SessionConfig.FORMAT_HTML, true);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray));
sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam),
isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig, out);
MetadataManager.INSTANCE.init();
- IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionConfig,
+ IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
double duration;
long startTime = System.currentTimeMillis();
translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
- new IStatementExecutor.Stats());
+ null, new IStatementExecutor.Stats());
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
new file mode 100644
index 0000000..2b70685
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
+import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Query service servlet that can run on NC nodes.
+ * Delegates query execution to CC, then serves the result.
+ */
+public class NCQueryServiceServlet extends QueryServiceServlet {
+ public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+ ILangExtension.Language queryLanguage) {
+ super(ctx, paths, appCtx, queryLanguage, null, null, null);
+ }
+
+ @Override
+ protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+ IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+ String handleUrl, long[] outExecStartEnd) throws Exception {
+ // Running on NC -> send 'execute' message to CC
+ INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
+ INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
+ IStatementExecutor.ResultDelivery ccDelivery = delivery == IStatementExecutor.ResultDelivery.IMMEDIATE
+ ? IStatementExecutor.ResultDelivery.DEFERRED : delivery;
+ ExecuteStatementResponseMessage responseMsg;
+ MessageFuture responseFuture = ncMb.registerMessageFuture();
+ try {
+ ExecuteStatementRequestMessage requestMsg =
+ new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
+ statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl);
+ outExecStartEnd[0] = System.nanoTime();
+ ncMb.sendMessageToCC(requestMsg);
+ responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
+ ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS);
+ outExecStartEnd[1] = System.nanoTime();
+ } finally {
+ ncMb.deregisterMessageFuture(responseFuture.getFutureId());
+ }
+
+ Throwable err = responseMsg.getError();
+ if (err != null) {
+ if (err instanceof Error) {
+ throw (Error) err;
+ } else if (err instanceof Exception) {
+ throw (Exception) err;
+ } else {
+ throw new Exception(err.toString(), err);
+ }
+ }
+
+ IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
+ if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
+ for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
+ ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
+ }
+ } else {
+ sessionOutput.out().append(responseMsg.getResult());
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 401f55e..42e23ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,9 +25,9 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -41,7 +41,7 @@
public class QueryResultApiServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName());
- public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+ public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
super(appCtx, ctx, paths);
}
@@ -94,8 +94,8 @@
// way to send the same OutputFormat value here as was
// originally determined there. Need to save this value on
// some object that we can obtain here.
- SessionConfig sessionConfig = RestApiServlet.initResponse(request, response);
- ResultUtil.printResults(appCtx, resultReader, sessionConfig, new Stats(), null);
+ SessionOutput sessionOutput = RestApiServlet.initResponse(request, response);
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
} catch (HyracksDataException e) {
final int errorCode = e.getErrorCode();
if (ErrorCode.NO_RESULTSET == errorCode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 20bffc4..c45f24a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,8 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -46,8 +47,8 @@
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -66,19 +67,23 @@
public class QueryServiceServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName());
+ protected final ILangExtension.Language queryLanguage;
private final ILangCompilationProvider compilationProvider;
private final IStatementExecutorFactory statementExecutorFactory;
private final IStorageComponentProvider componentProvider;
- private final IStatementExecutorContext queryCtx = new StatementExecutorContext();
+ private final IStatementExecutorContext queryCtx;
+ protected final IServiceContext serviceCtx;
- public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
- ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
- IStorageComponentProvider componentProvider) {
+ public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+ ILangExtension.Language queryLanguage, ILangCompilationProvider compilationProvider,
+ IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) {
super(appCtx, ctx, paths);
+ this.queryLanguage = queryLanguage;
this.compilationProvider = compilationProvider;
this.statementExecutorFactory = statementExecutorFactory;
this.componentProvider = componentProvider;
- ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+ this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+ this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
}
@Override
@@ -235,40 +240,22 @@
return SessionConfig.OutputFormat.CLEAN_JSON;
}
- private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl,
+ private static SessionOutput createSessionOutput(RequestParameters param, String handleUrl,
PrintWriter resultWriter) {
- SessionConfig.ResultDecorator resultPrefix = new SessionConfig.ResultDecorator() {
- int resultNo = -1;
-
- @Override
- public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
- app.append("\t\"");
- app.append(ResultFields.RESULTS.str());
- if (resultNo >= 0) {
- app.append('-').append(String.valueOf(resultNo));
- }
- ++resultNo;
- app.append("\": ");
- return app;
- }
- };
-
- SessionConfig.ResultDecorator resultPostfix = app -> app.append("\t,\n");
- SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("\t\"")
- .append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl).append(handle).append("\",\n");
- SessionConfig.ResultAppender appendStatus = (app, status) -> app.append("\t\"")
- .append(ResultFields.STATUS.str()).append("\": \"").append(status).append("\",\n");
+ SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
SessionConfig.OutputFormat format = getFormat(param.format);
- SessionConfig sessionConfig =
- new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, appendHandle, appendStatus);
+ SessionConfig sessionConfig = new SessionConfig(format);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV
&& "present".equals(getParameterValue(param.format, Attribute.HEADER.str())));
- return sessionConfig;
+ return new SessionOutput(sessionConfig, resultWriter, resultPrefix, resultPostfix, appendHandle, appendStatus);
}
private static void printClientContextID(PrintWriter pw, RequestParameters params) {
@@ -406,13 +393,13 @@
ResultDelivery delivery = parseResultDelivery(param.mode);
String handleUrl = getHandleUrl(param.host, param.path, delivery);
- SessionConfig sessionConfig = createSessionConfig(param, handleUrl, resultWriter);
+ SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter);
+ SessionConfig sessionConfig = sessionOutput.config();
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
HttpResponseStatus status = HttpResponseStatus.OK;
Stats stats = new Stats();
- long execStart = -1;
- long execEnd = -1;
+ long[] execStartEnd = new long[] { -1, -1 };
resultWriter.print("{\n");
printRequestId(resultWriter);
@@ -420,46 +407,33 @@
printSignature(resultWriter);
printType(resultWriter, sessionConfig);
try {
- final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
- if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
- // using a plain IllegalStateException here to get into the right catch clause for a 500
- throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
- }
if (param.statement == null || param.statement.isEmpty()) {
throw new AsterixException("Empty request, no statement provided");
}
- IParser parser = compilationProvider.getParserFactory().createParser(param.statement + ";");
- List<Statement> statements = parser.parse();
- MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
- statementExecutorFactory.create(appCtx, statements, sessionConfig, compilationProvider,
- componentProvider);
- execStart = System.nanoTime();
- translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats,
- param.clientContextID, queryCtx);
- execEnd = System.nanoTime();
+ String statementsText = param.statement + ";";
+ executeStatement(statementsText, sessionOutput, delivery, stats, param, handleUrl, execStartEnd);
if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
- ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
}
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
ResultUtil.printError(resultWriter, pe);
- ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
status = HttpResponseStatus.BAD_REQUEST;
} catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.toString(), e);
ResultUtil.printError(resultWriter, e);
- ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
} finally {
- if (execStart == -1) {
- execEnd = -1;
- } else if (execEnd == -1) {
- execEnd = System.nanoTime();
+ if (execStartEnd[0] == -1) {
+ execStartEnd[1] = -1;
+ } else if (execStartEnd[1] == -1) {
+ execStartEnd[1] = System.nanoTime();
}
}
- printMetrics(resultWriter, System.nanoTime() - elapsedStart, execEnd - execStart, stats.getCount(),
- stats.getSize());
+ printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
+ stats.getCount(), stats.getSize());
resultWriter.print("}\n");
resultWriter.flush();
String result = stringWriter.toString();
@@ -472,4 +446,23 @@
LOGGER.warning("Error flushing output writer");
}
}
+
+ protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
+ IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd)
+ throws Exception {
+ IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+ if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+ // using a plain IllegalStateException here to get into the right catch clause for a 500
+ throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+ }
+ IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator = statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
+ sessionOutput, compilationProvider, componentProvider);
+ outExecStartEnd[0] = System.nanoTime();
+ translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, null, stats,
+ param.clientContextID, queryCtx);
+ outExecStartEnd[1] = System.nanoTime();
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index d0c574e..71dddc0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -29,7 +29,7 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.http.api.IServletRequest;
@@ -41,7 +41,7 @@
public class QueryStatusApiServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER = Logger.getLogger(QueryStatusApiServlet.class.getName());
- public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+ public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
super(appCtx, ctx, paths);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index e339ba9..6b1e408 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -45,6 +45,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -84,7 +85,7 @@
* SessionConfig with the appropriate output writer and output-format
* based on the Accept: header and other servlet parameters.
*/
- static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException {
+ static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException {
HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
// CLEAN_JSON output is the default; most generally useful for a
// programmatic HTTP API
@@ -114,9 +115,9 @@
format = OutputFormat.LOSSLESS_JSON;
}
- SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
+ SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
.append("\":" + " \"").append(handle).append("\" }");
- SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, appendHandle, null);
+ SessionConfig sessionConfig = new SessionConfig(format);
// If it's JSON or ADM, check for the "wrapper-array" flag. Default is
// "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -152,7 +153,7 @@
default:
throw new IOException("Unknown format " + format);
}
- return sessionConfig;
+ return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null);
}
@Override
@@ -171,9 +172,9 @@
// enable cross-origin resource sharing
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
- SessionConfig sessionConfig = initResponse(request, response);
+ SessionOutput sessionOutput = initResponse(request, response);
QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
- doHandle(response, query, sessionConfig, resultDelivery);
+ doHandle(response, query, sessionOutput, resultDelivery);
} catch (Exception e) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +182,7 @@
}
}
- private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig,
+ private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput,
ResultDelivery resultDelivery) throws JsonProcessingException {
try {
response.setStatus(HttpResponseStatus.OK);
@@ -201,20 +202,20 @@
List<Statement> aqlStatements = parser.parse();
validate(aqlStatements);
MetadataManager.INSTANCE.init();
- IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig,
+ IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
- translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats());
+ translator.compileAndExecute(hcc, hds, resultDelivery, null, new IStatementExecutor.Stats());
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query);
ObjectNode errorResp =
ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe));
- sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp));
+ sessionOutput.out().write(new ObjectMapper().writeValueAsString(errorResp));
} catch (Exception e) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- ResultUtil.apiErrorHandler(sessionConfig.out(), e);
+ ResultUtil.apiErrorHandler(sessionOutput.out(), e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index fe6fa89..3bcd670 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -36,10 +36,10 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultPrinter;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.http.ParseException;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -77,29 +77,29 @@
return escaped;
}
- public static void printResults(ICcApplicationContext appCtx, ResultReader resultReader, SessionConfig conf,
+ public static void printResults(IApplicationContext appCtx, ResultReader resultReader, SessionOutput output,
Stats stats, ARecordType recordType) throws HyracksDataException {
- new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader);
+ new ResultPrinter(appCtx, output, stats, recordType).print(resultReader);
}
- public static void printResults(ICcApplicationContext appCtx, String record, SessionConfig conf, Stats stats,
+ public static void printResults(IApplicationContext appCtx, String record, SessionOutput output, Stats stats,
ARecordType recordType) throws HyracksDataException {
- new ResultPrinter(appCtx, conf, stats, recordType).print(record);
+ new ResultPrinter(appCtx, output, stats, recordType).print(record);
}
- public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException {
+ public static void printResultHandle(SessionOutput output, ResultHandle handle) throws HyracksDataException {
try {
- final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
- conf.appendHandle(app, handle.toString());
+ final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+ output.appendHandle(app, handle.toString());
} catch (AlgebricksException e) {
LOGGER.warn("error printing handle", e);
}
}
- public static void printStatus(SessionConfig conf, AbstractQueryApiServlet.ResultStatus rs) {
+ public static void printStatus(SessionOutput output, AbstractQueryApiServlet.ResultStatus rs) {
try {
- final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
- conf.appendStatus(app, rs.str());
+ final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+ output.appendStatus(app, rs.str());
} catch (AlgebricksException e) {
LOGGER.warn("error printing status", e);
}
@@ -318,4 +318,35 @@
return errorTemplate;
}
+ public static SessionOutput.ResultDecorator createPreResultDecorator() {
+ return new SessionOutput.ResultDecorator() {
+ int resultNo = -1;
+
+ @Override
+ public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
+ app.append("\t\"");
+ app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str());
+ if (resultNo >= 0) {
+ app.append('-').append(String.valueOf(resultNo));
+ }
+ ++resultNo;
+ app.append("\": ");
+ return app;
+ }
+ };
+ }
+
+ public static SessionOutput.ResultDecorator createPostResultDecorator() {
+ return app -> app.append("\t,\n");
+ }
+
+ public static SessionOutput.ResultAppender createResultHandleAppender(String handleUrl) {
+ return (app, handle) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str())
+ .append("\": \"").append(handleUrl).append(handle).append("\",\n");
+ }
+
+ public static SessionOutput.ResultAppender createResultStatusAppender() {
+ return (app, status) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str())
+ .append("\": \"").append(status).append("\",\n");
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index a9d4e22..b815d76 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -22,8 +22,9 @@
public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO";
- public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE_ATTR";
+ public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE";
public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
+ public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT";
private ServletConstants() {
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index ecf2c53..a9d24b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -36,6 +36,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobSpecification;
@@ -99,16 +100,17 @@
List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
- SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, optimize, true, generateBinaryRuntime);
+ SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime);
conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printJob);
if (printPhysicalOpsOnly) {
conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
}
+ SessionOutput output = new SessionOutput(conf, writer);
- IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, conf, compilationProvider,
+ IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE,
- new IStatementExecutor.Stats());
+ null, new IStatementExecutor.Stats());
writer.flush();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 6c6f2af..0c6b2cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -110,11 +110,11 @@
return statementExecutorFactory;
}
- public ILangCompilationProvider getAqlCompilationProvider() {
- return aqlCompilationProvider;
- }
-
- public ILangCompilationProvider getSqlppCompilationProvider() {
- return sqlppCompilationProvider;
+ public ILangCompilationProvider getCompilationProvider(Language lang) {
+ switch (lang) {
+ case AQL: return aqlCompilationProvider;
+ case SQLPP: return sqlppCompilationProvider;
+ default: throw new IllegalArgumentException(String.valueOf(lang));
+ }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
new file mode 100644
index 0000000..defa180
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+public final class ExecuteStatementRequestMessage implements ICcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
+
+ private final String requestNodeId;
+
+ private final long requestMessageId;
+
+ private final ILangExtension.Language lang;
+
+ private final String statementsText;
+
+ private final SessionConfig sessionConfig;
+
+ private final IStatementExecutor.ResultDelivery delivery;
+
+ private final String clientContextID;
+
+ private final String handleUrl;
+
+ public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
+ String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery,
+ String clientContextID, String handleUrl) {
+ this.requestNodeId = requestNodeId;
+ this.requestMessageId = requestMessageId;
+ this.lang = lang;
+ this.statementsText = statementsText;
+ this.sessionConfig = sessionConfig;
+ this.delivery = delivery;
+ this.clientContextID = clientContextID;
+ this.handleUrl = handleUrl;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException, InterruptedException {
+ ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+ ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
+ CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+ CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+ CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
+ ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
+ IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
+ IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
+ IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
+
+ ccSrv.getExecutor().submit(() -> {
+ ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+
+ try {
+ final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+ if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+ throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+ }
+
+ IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+
+ StringWriter outWriter = new StringWriter(256);
+ PrintWriter outPrinter = new PrintWriter(outWriter);
+ SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
+ appendHandle, appendStatus);
+
+ IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
+
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+ compilationProvider, storageComponentProvider);
+ translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata,
+ new IStatementExecutor.Stats(), clientContextID, statementExecutorContext);
+
+ outPrinter.close();
+ responseMsg.setResult(outWriter.toString());
+ responseMsg.setMetadata(outMetadata);
+ } catch (TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+ responseMsg.setError(pe);
+ } catch (AsterixException pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+ responseMsg.setError(new AsterixException(pe.getMessage()));
+ } catch (Exception e) {
+ String estr = e.toString();
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, estr, e);
+ responseMsg.setError(new Exception(estr));
+ }
+
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e.toString(), e);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
+ statementsText);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
new file mode 100644
index 0000000..4f9aa0c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+
+ public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
+
+ private final long requestMessageId;
+
+ private String result;
+
+ private IStatementExecutor.ResultMetadata metadata;
+
+ private Throwable error;
+
+ public ExecuteStatementResponseMessage(long requestMessageId) {
+ this.requestMessageId = requestMessageId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(requestMessageId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public void setError(Throwable error) {
+ this.error = error;
+ }
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
+
+ public IStatementExecutor.ResultMetadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(IStatementExecutor.ResultMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId,
+ result != null ? result.length() : 0);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 7ed3aef..452d13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -24,10 +24,11 @@
import java.io.StringWriter;
import java.nio.ByteBuffer;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.comm.IFrame;
@@ -47,6 +48,7 @@
private final FrameManager resultDisplayFrameMgr;
+ private final SessionOutput output;
private final SessionConfig conf;
private final Stats stats;
private final ARecordType recordType;
@@ -62,8 +64,9 @@
private ObjectMapper om;
private ObjectWriter ow;
- public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf, Stats stats, ARecordType recordType) {
- this.conf = conf;
+ public ResultPrinter(IApplicationContext appCtx, SessionOutput output, Stats stats, ARecordType recordType) {
+ this.output = output;
+ this.conf = output.config();
this.stats = stats;
this.recordType = recordType;
this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
@@ -112,18 +115,18 @@
// If we're outputting CSV with a header, the HTML header was already
// output by displayCSVHeader(), so skip it here
if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("<h4>Results:</h4>");
- conf.out().println("<pre class=\"result-content\">");
+ output.out().println("<h4>Results:</h4>");
+ output.out().println("<pre class=\"result-content\">");
}
try {
- conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+ output.resultPrefix(new AlgebricksAppendable(output.out()));
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
- conf.out().print("[ ");
+ output.out().print("[ ");
wrapArray = true;
}
@@ -134,29 +137,29 @@
if (quoteRecord) {
StringWriter sw = new StringWriter();
appendCSVHeader(sw, recordType);
- conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
- conf.out().print("\n");
+ output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+ output.out().print("\n");
notFirst = true;
} else {
- appendCSVHeader(conf.out(), recordType);
+ appendCSVHeader(output.out(), recordType);
}
}
}
private void printPostfix() throws HyracksDataException {
- conf.out().flush();
+ output.out().flush();
if (wrapArray) {
- conf.out().println(" ]");
+ output.out().println(" ]");
}
try {
- conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+ output.resultPostfix(new AlgebricksAppendable(output.out()));
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("</pre>");
+ output.out().println("</pre>");
}
- conf.out().flush();
+ output.out().flush();
}
private void displayRecord(String result) throws HyracksDataException {
@@ -177,7 +180,7 @@
// TODO(tillw): this is inefficient as well
record = JSONUtil.quoteAndEscape(record);
}
- conf.out().print(record);
+ output.out().print(record);
stats.setCount(stats.getCount() + 1);
// TODO(tillw) fix this approximation
stats.setSize(stats.getSize() + record.length());
@@ -211,7 +214,7 @@
}
String result = new String(frameBytes, start, length, UTF_8);
if (wrapArray && notFirst) {
- conf.out().print(", ");
+ output.out().print(", ");
}
notFirst = true;
displayRecord(result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index e2f17ac..00fb1b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -28,7 +28,7 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
@@ -48,9 +48,9 @@
}
@Override
- public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+ public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
- return new QueryTranslator(appCtx, statements, conf, compilationProvider, storageComponentProvider,
+ return new QueryTranslator(appCtx, statements, output, compilationProvider, storageComponentProvider,
executorService);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d37de0d..e0648ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -161,6 +161,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.translator.TypeTranslator;
import org.apache.asterix.translator.util.ValidateUtil;
import org.apache.asterix.utils.DataverseUtil;
@@ -170,6 +171,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -203,6 +205,7 @@
public static final boolean IS_DEBUG_MODE = false;// true
protected final List<Statement> statements;
protected final ICcApplicationContext appCtx;
+ protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
@@ -211,12 +214,13 @@
protected final IStorageComponentProvider componentProvider;
protected final ExecutorService executorService;
- public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+ public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
ExecutorService executorService) {
this.appCtx = appCtx;
this.statements = statements;
- this.sessionConfig = conf;
+ this.sessionOutput = output;
+ this.sessionConfig = output.config();
this.componentProvider = componentProvider;
declaredFunctions = getDeclaredFunctions(statements);
apiFramework = new APIFramework(compliationProvider);
@@ -241,13 +245,14 @@
@Override
public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats) throws Exception {
- compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
+ ResultMetadata outMetadata, Stats stats) throws Exception {
+ compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, null);
}
@Override
public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception {
+ ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+ throws Exception {
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -262,7 +267,7 @@
try {
for (Statement stmt : statements) {
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
- sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
+ sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
}
validateOperation(appCtx, activeDataverse, stmt);
rewriteStatement(stmt); // Rewrite the statement's AST.
@@ -324,8 +329,8 @@
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
|| resultDelivery == ResultDelivery.DEFERRED);
}
- handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false,
- clientContextId, ctx);
+ handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata,
+ stats, false, clientContextId, ctx);
break;
case Statement.Kind.DELETE:
handleDeleteStatement(metadataProvider, stmt, hcc, false);
@@ -358,8 +363,8 @@
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
- handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats, clientContextId,
- ctx);
+ handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats,
+ clientContextId, ctx);
break;
case Statement.Kind.COMPACT:
handleCompactStatement(metadataProvider, stmt, hcc);
@@ -1694,7 +1699,7 @@
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
- JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
+ JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
@@ -1712,8 +1717,8 @@
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
- IStatementExecutor.Stats stats, boolean compileOnly, String clientContextId, IStatementExecutorContext ctx)
- throws Exception {
+ ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId,
+ IStatementExecutorContext ctx) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
final IMetadataLocker locker = new IMetadataLocker() {
@@ -1755,7 +1760,8 @@
}
if (stmtInsertUpsert.getReturnExpression() != null) {
- deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
+ clientContextId, ctx);
} else {
locker.lock();
try {
@@ -1811,11 +1817,11 @@
// Query Rewriting (happens under the same ongoing metadata transaction)
Pair<IReturningStatement, Integer> rewrittenResult =
- apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig);
+ apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionOutput);
// Query Compilation (happens under the same ongoing metadata transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
- rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
+ rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt);
}
private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -1824,7 +1830,7 @@
// Insert/upsert statement rewriting (happens under the same ongoing metadata transaction)
Pair<IReturningStatement, Integer> rewrittenResult =
- apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionConfig);
+ apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionOutput);
InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first;
String dataverseName = getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1846,7 +1852,7 @@
}
// Insert/upsert statement compilation (happens under the same ongoing metadata transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
- rewrittenResult.second, datasetName, sessionConfig, clfrqs);
+ rewrittenResult.second, datasetName, sessionOutput, clfrqs);
}
protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
@@ -2052,7 +2058,7 @@
datasets.add(ds);
}
org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
- FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections,
+ FeedOperations.buildStartFeedJob(sessionOutput, metadataProvider, feed, feedConnections,
compilationProvider, storageComponentProvider, qtFactory, hcc);
JobSpecification feedJob = jobInfo.getLeft();
@@ -2287,8 +2293,8 @@
}
protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
- IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, String clientContextId,
- IStatementExecutorContext ctx) throws Exception {
+ IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
+ String clientContextId, IStatementExecutorContext ctx) throws Exception {
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
@@ -2318,12 +2324,14 @@
throw e;
}
};
- deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx);
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, clientContextId,
+ ctx);
}
private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
- MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats,
- String clientContextId, IStatementExecutorContext ctx) throws Exception {
+ MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
+ ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+ throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -2339,13 +2347,17 @@
case IMMEDIATE:
createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
- ResultUtil.printResults(appCtx, resultReader, sessionConfig, stats,
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);
break;
case DEFERRED:
createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
- ResultUtil.printResultHandle(sessionConfig, new ResultHandle(id, resultSetId));
+ ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
+ if (outMetadata != null) {
+ outMetadata.getResultSets()
+ .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
+ }
}, clientContextId, ctx);
break;
default:
@@ -2360,8 +2372,8 @@
try {
createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
final ResultHandle handle = new ResultHandle(id, resultSetId);
- ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.RUNNING);
- ResultUtil.printResultHandle(sessionConfig, handle);
+ ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
+ ResultUtil.printResultHandle(sessionOutput, handle);
synchronized (printed) {
printed.setTrue();
printed.notify();
@@ -2370,8 +2382,8 @@
} catch (Exception e) {
if (JobId.INVALID.equals(jobId.getValue())) {
// compilation failed
- ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.FAILED);
- ResultUtil.printError(sessionConfig.out(), e);
+ ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED);
+ ResultUtil.printError(sessionOutput.out(), e);
} else {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
resultDelivery.name() + " job with id " + jobId.getValue() + " " + "failed", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index bf7d5eb..57cc340 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -19,17 +19,19 @@
package org.apache.asterix.hyracks.bootstrap;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.ApiServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -73,6 +75,7 @@
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -95,6 +98,7 @@
protected ICCServiceContext ccServiceCtx;
protected CCExtensionManager ccExtensionManager;
protected IStorageComponentProvider componentProvider;
+ protected StatementExecutorContext statementExecutorCtx;
protected WebManager webManager;
protected CcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
@@ -124,9 +128,10 @@
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), componentProvider);
+ statementExecutorCtx = new StatementExecutorContext();
appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager,
() -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy,
- new ActiveLifecycleListener());
+ new ActiveLifecycleListener(), componentProvider);
ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
ccExtensionManager = new CCExtensionManager(getExtensions());
appCtx.setExtensionManager(ccExtensionManager);
@@ -184,7 +189,7 @@
IHyracksClientConnection hcc = getHcc();
webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx,
- ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(),
+ ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider));
return webServer;
}
@@ -197,6 +202,8 @@
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
ccServiceCtx.getControllerService().getExecutor());
+ jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx);
+ jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx);
// AQL rest APIs.
addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -241,28 +248,28 @@
protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
switch (key) {
case Servlets.AQL:
- return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+ return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_QUERY:
- return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+ return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_UPDATE:
- return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+ return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_DDL:
- return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(),
+ return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP:
- return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+ return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_QUERY:
- return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+ return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_UPDATE:
- return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+ return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_DDL:
- return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
+ return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.RUNNING_REQUESTS:
return new QueryCancellationServlet(ctx, paths);
@@ -271,8 +278,9 @@
case Servlets.QUERY_RESULT:
return new QueryResultApiServlet(ctx, paths, appCtx);
case Servlets.QUERY_SERVICE:
- return new QueryServiceServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(),
- getStatementExecutorFactory(), componentProvider);
+ return new QueryServiceServlet(ctx, paths, appCtx, SQLPP,
+ ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(),
+ componentProvider);
case Servlets.CONNECTOR:
return new ConnectorApiServlet(ctx, paths, appCtx);
case Servlets.SHUTDOWN:
@@ -292,13 +300,17 @@
}
}
- private IStatementExecutorFactory getStatementExecutorFactory() {
+ public IStatementExecutorFactory getStatementExecutorFactory() {
return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
}
+ public IStatementExecutorContext getStatementExecutorContext() {
+ return statementExecutorCtx;
+ }
+
@Override
public void startupCompleted() throws Exception {
- ccServiceCtx.getControllerService().getExecutor().submit((Callable) () -> {
+ ccServiceCtx.getControllerService().getExecutor().submit(() -> {
ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
return null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 9c24acf..ec01e1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -49,21 +49,26 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.WebManager;
public class NCApplication extends BaseNCApplication {
private static final Logger LOGGER = Logger.getLogger(NCApplication.class.getName());
- private INCServiceContext ncServiceCtx;
+ protected INCServiceContext ncServiceCtx;
private INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated = false;
private SystemState systemState;
+ protected WebManager webManager;
@Override
public void registerConfig(IConfigManager configManager) {
@@ -122,6 +127,8 @@
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
}
+ webManager = new WebManager();
+
performLocalCleanUp();
}
@@ -131,6 +138,10 @@
Logger.getLogger("org.apache.asterix").setLevel(level);
}
+ protected void configureServers() throws Exception {
+ // override to start web services on NC nodes
+ }
+
protected List<AsterixExtension> getExtensions() {
return Collections.emptyList();
}
@@ -143,6 +154,9 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopping Asterix node controller: " + nodeId);
}
+
+ webManager.stop();
+
//Clean any temporary files
performLocalCleanUp();
@@ -163,6 +177,10 @@
@Override
public void startupCompleted() throws Exception {
+ // configure servlets after joining the cluster, so we can create HyracksClientConnection
+ configureServers();
+ webManager.start();
+
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
if (systemState == SystemState.PERMANENT_DATA_LOSS
@@ -262,4 +280,10 @@
public INcApplicationContext getApplicationContext() {
return runtimeContext;
}
+
+ protected IHyracksClientConnection getHcc() throws Exception {
+ NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService();
+ ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+ return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 630aabe..33e89f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,12 +31,16 @@
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
+import io.netty.util.collection.LongObjectHashMap;
+import io.netty.util.collection.LongObjectMap;
+
public class NCMessageBroker implements INCMessageBroker {
private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
@@ -44,6 +49,8 @@
private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ;
private final ConcurrentFramePool messagingFramePool;
private final int maxMsgSize;
+ private final AtomicLong futureIdGenerator;
+ private final LongObjectMap<MessageFuture> futureMap;
public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
this.ncs = ncs;
@@ -53,6 +60,8 @@
messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
messagingProperties.getFrameSize());
receivedMsgsQ = new LinkedBlockingQueue<>();
+ futureIdGenerator = new AtomicLong();
+ futureMap = new LongObjectHashMap<>();
MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
appContext.getThreadExecutor().execute(msgDeliverySvc);
}
@@ -104,6 +113,26 @@
ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
}
+ @Override
+ public MessageFuture registerMessageFuture() {
+ long futureId = futureIdGenerator.incrementAndGet();
+ MessageFuture future = new MessageFuture(futureId);
+ synchronized (futureMap) {
+ if (futureMap.containsKey(futureId)) {
+ throw new IllegalStateException();
+ }
+ futureMap.put(futureId, future);
+ }
+ return future;
+ }
+
+ @Override
+ public MessageFuture deregisterMessageFuture(long futureId) {
+ synchronized (futureMap) {
+ return futureMap.remove(futureId);
+ }
+ }
+
private class MessageDeliveryService implements Runnable {
/*
* TODO Currently this thread is not stopped when it is interrupted because
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index bffaeef..d1ff871 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -69,7 +69,7 @@
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -151,7 +151,7 @@
return spec;
}
- private static JobSpecification getConnectionJob(SessionConfig sessionConfig, MetadataProvider metadataProvider,
+ private static JobSpecification getConnectionJob(SessionOutput sessionOutput, MetadataProvider metadataProvider,
FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException {
@@ -165,7 +165,7 @@
statements.add(dataverseDecl);
statements.add(subscribeStmt);
IStatementExecutor translator = qtFactory.create(metadataProvider.getApplicationContext(), statements,
- sessionConfig, compilationProvider, storageComponentProvider);
+ sessionOutput, compilationProvider, storageComponentProvider);
// configure the metadata provider
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy());
@@ -357,7 +357,7 @@
}
public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
- SessionConfig sessionConfig, MetadataProvider metadataProvider, Feed feed,
+ SessionOutput sessionOutput, MetadataProvider metadataProvider, Feed feed,
List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
IHyracksClientConnection hcc) throws Exception {
@@ -372,7 +372,7 @@
String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
// Add connection job
for (FeedConnection feedConnection : feedConnections) {
- JobSpecification connectionJob = getConnectionJob(sessionConfig, metadataProvider, feedConnection,
+ JobSpecification connectionJob = getConnectionJob(sessionOutput, metadataProvider, feedConnection,
ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc);
jobsList.add(connectionJob);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index e2885b3..5e3da5f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -39,7 +39,7 @@
import org.apache.asterix.lang.common.statement.RunStatement;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.junit.Assert;
import org.junit.Test;
@@ -51,7 +51,7 @@
@Test
public void test() throws Exception {
List<Statement> statements = new ArrayList<>();
- SessionConfig mockSessionConfig = mock(SessionConfig.class);
+ SessionOutput mockSessionOutput = mock(SessionOutput.class);
RunStatement mockRunStatement = mock(RunStatement.class);
// Mocks AppContextInfo.
@@ -70,7 +70,7 @@
when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
IStatementExecutor aqlTranslator = new DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo,
- statements, mockSessionConfig, new AqlCompilationProvider(), new StorageComponentProvider());
+ statements, mockSessionOutput, new AqlCompilationProvider(), new StorageComponentProvider());
List<String> parameters = new ArrayList<>();
parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
parameters.add("org.apache.pregelix.example.PageRankVertex");
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index f1e6aca..9913493 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,11 +32,13 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Inet4Address;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -118,19 +120,26 @@
/*
* Instance members
*/
- protected final String host;
- protected final int port;
+ protected final List<InetSocketAddress> endpoints;
+ protected int endpointSelector;
protected ITestLibrarian librarian;
- public TestExecutor(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
public TestExecutor() {
this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
}
+ public TestExecutor(String host, int port) {
+ this(InetSocketAddress.createUnresolved(host, port));
+ }
+
+ public TestExecutor(InetSocketAddress endpoint) {
+ this(Collections.singletonList(endpoint));
+ }
+
+ public TestExecutor(List<InetSocketAddress> endpoints) {
+ this.endpoints = endpoints;
+ }
+
public void setLibrarian(ITestLibrarian librarian) {
this.librarian = librarian;
}
@@ -373,7 +382,7 @@
continue;
}
throw new Exception(
- "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result.");
+ "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result: "+actual);
}
} catch (Exception e) {
System.err.println("Actual results file: " + actualFile.toString());
@@ -811,6 +820,7 @@
break;
case "mgx":
executeManagixCommand(stripLineComments(statement).trim());
+ Thread.sleep(8000);
break;
case "txnqbc": // qbc represents query before crash
InputStream resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
@@ -1165,7 +1175,7 @@
protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception {
String[] split = endpoint.split("\\?");
- URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null);
+ URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : null);
return executeURI(ctxType, uri, fmt);
}
@@ -1184,7 +1194,7 @@
//get node process id
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
String endpoint = "/admin/cluster/node/" + nodeId + "/config";
- InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null));
+ InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null));
StringWriter actual = new StringWriter();
IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
String config = actual.toString();
@@ -1201,7 +1211,7 @@
private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception {
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
String endpoint = "/admin/cluster/node/" + nodeId + "/config";
- InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint));
+ InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null));
StringWriter actual = new StringWriter();
IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
String config = actual.toString();
@@ -1280,8 +1290,16 @@
+ cUnit.getName() + "_qbc.adm");
}
+ protected URI createEndpointURI(String path, String query) throws URISyntaxException {
+ int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+ InetSocketAddress endpoint = endpoints.get(endpointIdx);
+ URI uri = new URI("http", null, endpoint.getHostString(), endpoint.getPort(), path, query, null);
+ LOGGER.fine("Created endpoint URI: " + uri);
+ return uri;
+ }
+
protected URI getEndpoint(String servlet) throws URISyntaxException {
- return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null);
+ return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
}
public static String stripJavaComments(String text) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
new file mode 100644
index 0000000..9a2ad3c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -0,0 +1,52 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-group name="async-deferred">
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-failed">
+ <output-dir compare="Text">async-failed</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-compilation-failed">
+ <output-dir compare="Text">async-compilation-failed</output-dir>
+ <expected-error>Cannot find dataset gargel</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="deferred">
+ <output-dir compare="Text">deferred</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async">
+ <output-dir compare="Text">async</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-repeated">
+ <output-dir compare="Text">async-repeated</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-running">
+ <output-dir compare="Text">async-running</output-dir>
+ </compilation-unit>
+ </test-case>
+</test-group>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8f7ffc3..d5716d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -18,43 +18,10 @@
!-->
<!DOCTYPE test-suite [
<!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml">
-
+ <!ENTITY AsyncDeferredQueries SYSTEM "queries_sqlpp/async-deferred/AsyncDeferredQueries.xml">
]>
<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
- <test-group name="async-deferred">
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-failed">
- <output-dir compare="Text">async-failed</output-dir>
- <expected-error>Injected failure in asterix:inject-failure</expected-error>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-compilation-failed">
- <output-dir compare="Text">async-compilation-failed</output-dir>
- <expected-error>Cannot find dataset gargel</expected-error>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="deferred">
- <output-dir compare="Text">deferred</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async">
- <output-dir compare="Text">async</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-repeated">
- <output-dir compare="Text">async-repeated</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-running">
- <output-dir compare="Text">async-running</output-dir>
- </compilation-unit>
- </test-case>
- </test-group>
+ &AsyncDeferredQueries;
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index a9e6448..e5b78cf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -72,5 +73,24 @@
*/
public IHyracksClientConnection getHcc();
+ /**
+ * Returns the resource manager
+ *
+ * @return {@link IResourceIdManager} implementation instance
+ */
public IResourceIdManager getResourceIdManager();
+
+ /**
+ * Returns the storage component provider
+ *
+ * @return {@link IStorageComponentProvider} implementation instance
+ */
+ public IStorageComponentProvider getStorageComponentProvider();
+
+ /**
+ * Returns the extension manager
+ *
+ * @return the extension manager instance
+ */
+ public Object getExtensionManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index e1101b3..86d1074 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -26,7 +26,6 @@
* Sends application message from this NC to the CC.
*
* @param message
- * @param callback
* @throws Exception
*/
public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
@@ -35,7 +34,6 @@
* Sends application message from this NC to another NC.
*
* @param message
- * @param callback
* @throws Exception
*/
public void sendMessageToNC(String nodeId, INcAddressedMessage message)
@@ -47,4 +45,17 @@
* @param msg
*/
public void queueReceivedMessage(INcAddressedMessage msg);
+
+ /**
+ * Creates and registers a Future for a message that will be send through this broker
+ * @return new Future
+ */
+ MessageFuture registerMessageFuture();
+
+ /**
+ * Removes a previously registered Future
+ * @param futureId future identifier
+ * @return existing Future or {@code null} if there was no Future associated with this identifier
+ */
+ MessageFuture deregisterMessageFuture(long futureId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
new file mode 100644
index 0000000..f4ed1ff
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CompletableFuture} associated with an identifier
+ */
+public class MessageFuture extends CompletableFuture<IMessage> {
+
+ private final long futureId;
+
+ public MessageFuture(long futureId) {
+ this.futureId = futureId;
+ }
+
+ public long getFutureId() {
+ return futureId;
+ }
+}
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index d9ca295..e341fef 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -281,6 +281,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 3c65a83..814e109 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -88,6 +88,7 @@
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
cmdHandler.processCommand(command.split(" "));
+ Thread.sleep(4000);
AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
.getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 8608d68..a4c271c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
@@ -54,6 +55,7 @@
public class CcApplicationContext implements ICcApplicationContext {
private ICCServiceContext ccServiceCtx;
+ private IStorageComponentProvider storageComponentProvider;
private IGlobalRecoveryManager globalRecoveryManager;
private ILibraryManager libraryManager;
private IResourceIdManager resourceIdManager;
@@ -77,7 +79,8 @@
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, IResourceIdManager resourceIdManager,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
- IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener)
+ IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener,
+ IStorageComponentProvider storageComponentProvider)
throws AsterixException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
@@ -102,6 +105,7 @@
this.nodeProperties = new NodeProperties(propertiesAccessor);
this.metadataBootstrapSupplier = metadataBootstrapSupplier;
this.globalRecoveryManager = globalRecoveryManager;
+ this.storageComponentProvider = storageComponentProvider;
}
@Override
@@ -174,6 +178,7 @@
return libraryManager;
}
+ @Override
public Object getExtensionManager() {
return extensionManager;
}
@@ -213,4 +218,9 @@
public IJobLifecycleListener getActiveLifecycleListener() {
return activeLifeCycleListener;
}
+
+ @Override
+ public IStorageComponentProvider getStorageComponentProvider() {
+ return storageComponentProvider;
+ }
}
diff --git a/asterixdb/asterix-server/src/main/assembly/filter.properties b/asterixdb/asterix-server/src/main/assembly/filter.properties
index b01047f..69c5b02 100644
--- a/asterixdb/asterix-server/src/main/assembly/filter.properties
+++ b/asterixdb/asterix-server/src/main/assembly/filter.properties
@@ -21,4 +21,5 @@
NC_COMMAND=asterixnc
HELPER_COMMAND=asterixhelper
LISTEN_PORT=19002
-PRODUCT=AsterixDB
\ No newline at end of file
+PRODUCT=AsterixDB
+NC_BLUE_EXTRA=
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
index d1f03bc..184728d 100644
--- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
+++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
@@ -25,6 +25,7 @@
txn.log.dir=data/blue/txnlog
core.dump.dir=data/blue/coredump
iodevices=data/blue
+${NC_BLUE_EXTRA}
[nc]
storage.subdir=storage
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index f34dc6a..ebd945e 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -20,14 +20,29 @@
import java.io.File;
+import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.transform.sax.SAXSource;
+
+import org.xml.sax.InputSource;
public class TestSuiteParser {
public TestSuiteParser() {
}
public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception {
+ SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+ saxParserFactory.setNamespaceAware(true);
+ saxParserFactory.setXIncludeAware(true);
+ SAXParser saxParser = saxParserFactory.newSAXParser();
+ saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
+
JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
- return (org.apache.asterix.testframework.xml.TestSuite) ctx.createUnmarshaller().unmarshal(testSuiteCatalog);
+ Unmarshaller um = ctx.createUnmarshaller();
+ return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(new SAXSource(saxParser.getXMLReader(),
+ new InputSource(testSuiteCatalog.toURI().toString())));
}
}
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 0fed1bb..b06c4b5 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -212,7 +212,6 @@
<usedDependency>org.apache.httpcomponents:httpclient</usedDependency>
<usedDependency>org.apache.httpcomponents:httpcore</usedDependency>
<usedDependency>org.slf4j:slf4j-simple</usedDependency>
- <usedDependency>xerces:xercesImpl</usedDependency>
</usedDependencies>
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>org.apache.asterix:asterix-external-data:zip:*</ignoredUnusedDeclaredDependency>
@@ -388,11 +387,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>xerces</groupId>
- <artifactId>xercesImpl</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index cdfc492..ca20f4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -47,6 +47,8 @@
// Constants
private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
+ protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
+ new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK);
private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName());
private static final int FAILED = -1;
private static final int STOPPED = 0;
@@ -192,8 +194,7 @@
Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK))
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
channel = b.bind(port).sync().channel();
}
@@ -225,7 +226,6 @@
}
}
}
- LOGGER.warning("No servlet for " + uri);
return null;
}
@@ -254,7 +254,15 @@
return b && (path.length() == cpl || '/' == path.charAt(cpl));
}
+ protected HttpServerHandler createHttpHandler(int chunkSize) {
+ return new HttpServerHandler<>(this, chunkSize);
+ }
+
public ExecutorService getExecutor() {
return executor;
}
+
+ protected EventLoopGroup getWorkerGroup() {
+ return workerGroup;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 6f7ccba..00b3cb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -33,15 +33,16 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
-public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = Logger.getLogger(HttpServerHandler.class.getName());
- protected final HttpServer server;
+ protected final T server;
protected final int chunkSize;
protected HttpRequestHandler handler;
- public HttpServerHandler(HttpServer server, int chunkSize) {
+ public HttpServerHandler(T server, int chunkSize) {
this.server = server;
this.chunkSize = chunkSize;
}
@@ -65,18 +66,18 @@
try {
IServlet servlet = server.getServlet(request);
if (servlet == null) {
- respond(ctx, request, HttpResponseStatus.NOT_FOUND);
+ handleServletNotFound(ctx, request);
} else {
submit(ctx, servlet, request);
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e);
- respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ respond(ctx, request.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
- private void respond(ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
- DefaultHttpResponse response = new DefaultHttpResponse(request.protocolVersion(), status);
+ protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) {
+ DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status);
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
@@ -86,7 +87,7 @@
servletRequest = HttpUtil.toServletRequest(request);
} catch (IllegalArgumentException e) {
LOGGER.log(Level.WARNING, "Failure Decoding Request", e);
- respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+ respond(ctx, request.protocolVersion(), HttpResponseStatus.BAD_REQUEST);
return;
}
handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
@@ -102,6 +103,13 @@
}
}
+ protected void handleServletNotFound(ChannelHandlerContext ctx, FullHttpRequest request) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("No servlet for " + request.uri());
+ }
+ respond(ctx, request.protocolVersion(), HttpResponseStatus.NOT_FOUND);
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index bc67865..4f8655f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -44,6 +44,6 @@
MAX_REQUEST_CHUNK_SIZE));
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
- p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
+ p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE));
}
}