Fix async result delivery
- add new result status "started" for async requests
- add support for result status to the test framework
- stabilize result distribution for error in async requests
- add support for "pollget" to test async execution
- add sleep function
- allow inject-failure function to return any first argument
- use URIs instead on Strings in the TestExecutor to ensure that URIs are
correctly escaped
- add a few tests
Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1394
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index d14b8e2..6f24fc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -205,6 +205,7 @@
import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardPrefixDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedCheckDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.SleepDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SpatialAreaDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SpatialCellDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
@@ -412,6 +413,9 @@
temp.add(OrderedListConstructorDescriptor.FACTORY);
temp.add(UnorderedListConstructorDescriptor.FACTORY);
+ // Sleep function
+ temp.add(SleepDescriptor.FACTORY);
+
// Inject failure function
temp.add(InjectFailureDescriptor.FACTORY);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index bdce0ca..83233f1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -111,7 +111,7 @@
ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.resultTTL = 30000;
+ ccConfig.resultTTL = 120000;
ccConfig.resultSweepThreshold = 1000;
ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
return ccConfig;
@@ -126,7 +126,7 @@
ncConfig.resultIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.messagingIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.nodeId = ncName;
- ncConfig.resultTTL = 30000;
+ ncConfig.resultTTL = 120000;
ncConfig.resultSweepThreshold = 1000;
ncConfig.appArgs = Collections.singletonList("-virtual-NC");
ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 410fd1e..744b929 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import java.io.IOException;
import java.io.PrintWriter;
@@ -33,7 +34,6 @@
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -156,6 +156,7 @@
}
public enum ResultStatus {
+ STARTED("started"),
SUCCESS("success"),
TIMEOUT("timeout"),
ERRORS("errors"),
@@ -458,13 +459,13 @@
return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
}
- private static QueryTranslator.ResultDelivery parseResultDelivery(String mode) {
+ private static ResultDelivery parseResultDelivery(String mode) {
if ("async".equals(mode)) {
- return QueryTranslator.ResultDelivery.ASYNC;
+ return ResultDelivery.ASYNC;
} else if ("deferred".equals(mode)) {
- return QueryTranslator.ResultDelivery.DEFERRED;
+ return ResultDelivery.DEFERRED;
} else {
- return QueryTranslator.ResultDelivery.IMMEDIATE;
+ return ResultDelivery.IMMEDIATE;
}
}
@@ -474,7 +475,7 @@
final StringWriter stringWriter = new StringWriter();
final PrintWriter resultWriter = new PrintWriter(stringWriter);
- QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode);
+ ResultDelivery delivery = parseResultDelivery(param.mode);
SessionConfig sessionConfig = createSessionConfig(param, resultWriter);
ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
@@ -517,7 +518,7 @@
execStart = System.nanoTime();
translator.compileAndExecute(hcc, hds, delivery, stats);
execEnd = System.nanoTime();
- printStatus(resultWriter, ResultStatus.SUCCESS);
+ printStatus(resultWriter, ResultDelivery.ASYNC == delivery ? ResultStatus.STARTED : ResultStatus.SUCCESS);
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
printError(resultWriter, pe);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index 039c740..3d31616 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -94,19 +94,7 @@
resultReader.open(jobId, rsId);
ObjectNode jsonResponse = om.createObjectNode();
- String status;
- switch (resultReader.getStatus()) {
- case RUNNING:
- status = "RUNNING";
- break;
- case SUCCESS:
- status = "SUCCESS";
- break;
- default:
- status = "ERROR";
- break;
- }
- jsonResponse.put("status", status);
+ jsonResponse.put("status", resultReader.getStatus().name());
out.write(jsonResponse.toString());
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 31ace22..f7e70a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.algebra.base.ILangExtension.Language;
@@ -50,12 +51,13 @@
private final IStatementExecutorExtension statementExecutorExtension;
private final ILangCompilationProvider aqlCompilationProvider;
private final ILangCompilationProvider sqlppCompilationProvider;
- private final DefaultStatementExecutorFactory defaultQueryTranslatorFactory;
+ private transient IStatementExecutorFactory statementExecutorFactory;
/**
* Initialize {@code CompilerExtensionManager} from configuration
*
* @param list
+ * a list of extensions
* @throws InstantiationException
* @throws IllegalAccessException
* @throws ClassNotFoundException
@@ -66,7 +68,6 @@
Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
IStatementExecutorExtension see = null;
- defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
if (list != null) {
for (AsterixExtension extensionConf : list) {
IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance();
@@ -94,9 +95,19 @@
this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second;
}
+ /** @deprecated use getStatementExecutorFactory instead */
+ @Deprecated
public IStatementExecutorFactory getQueryTranslatorFactory() {
- return statementExecutorExtension == null ? defaultQueryTranslatorFactory
- : statementExecutorExtension.getQueryTranslatorFactory();
+ return getStatementExecutorFactory(null);
+ }
+
+ public IStatementExecutorFactory getStatementExecutorFactory(ExecutorService executorService) {
+ if (statementExecutorFactory == null) {
+ statementExecutorFactory = statementExecutorExtension == null
+ ? new DefaultStatementExecutorFactory(executorService)
+ : statementExecutorExtension.getStatementExecutorFactory(executorService);
+ }
+ return statementExecutorFactory;
}
public ILangCompilationProvider getAqlCompilationProvider() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
index f7b6842..f77b25b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.cc;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.common.api.IExtension;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -33,6 +35,15 @@
/**
* @return The extension implementation of the {@code IStatementExecutorFactory}
+ * @deprecated use getStatementExecutorFactory instead
*/
+ @Deprecated
IStatementExecutorFactory getQueryTranslatorFactory();
+
+ /**
+ * @return The extension implementation of the {@code IStatementExecutorFactory}
+ */
+ default IStatementExecutorFactory getStatementExecutorFactory(ExecutorService executorService) {
+ return getQueryTranslatorFactory();
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index 99dcc83..15ed1b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -19,6 +19,8 @@
package org.apache.asterix.app.translator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -26,12 +28,27 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
+ protected final ExecutorService executorService;
+
+ /*
+ * @deprecated use other constructor
+ */
+ public DefaultStatementExecutorFactory() {
+ this(Executors.newSingleThreadExecutor(
+ new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName())));
+ }
+
+ public DefaultStatementExecutorFactory(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
@Override
public IStatementExecutor create(List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
- return new QueryTranslator(statements, conf, compilationProvider, storageComponentProvider);
+ return new QueryTranslator(statements, conf, compilationProvider, storageComponentProvider, executorService);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6fbf2a5..3c69d83 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,6 +34,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -218,9 +219,10 @@
protected final APIFramework apiFramework;
protected final IRewriterFactory rewriterFactory;
protected final IStorageComponentProvider componentProvider;
+ protected final ExecutorService executorService;
- public QueryTranslator(List<Statement> statements, SessionConfig conf,
- ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider) {
+ public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider,
+ IStorageComponentProvider componentProvider, ExecutorService executorService) {
this.statements = statements;
this.sessionConfig = conf;
this.componentProvider = componentProvider;
@@ -228,6 +230,7 @@
apiFramework = new APIFramework(compliationProvider);
rewriterFactory = compliationProvider.getRewriterFactory();
activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+ this.executorService = executorService;
}
protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
@@ -1327,11 +1330,6 @@
}
}
- protected Dataset getDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName)
- throws MetadataException {
- return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- }
-
public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1826,41 +1824,60 @@
}
}
- public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
+ public void handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
Query query = stmtInsertUpsert.getQuery();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
- dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
- try {
- metadataProvider.setWriteTransaction(true);
- JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- if (jobSpec != null && !compileOnly) {
- if (stmtInsertUpsert.getReturnExpression() != null) {
- handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
- } else {
- JobUtils.runJob(hcc, jobSpec, true);
+ final IMetadataLocker locker = new IMetadataLocker() {
+ @Override
+ public void lock() {
+ MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
+ dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
+ query.getDatasets());
+ }
+
+ @Override
+ public void unlock() {
+ MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
+ dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
+ query.getDatasets());
+ }
+ };
+ final IStatementCompiler compiler = () -> {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ metadataProvider.setWriteTransaction(true);
+ final JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ return compileOnly ? null : jobSpec;
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
}
+ throw e;
}
- return jobSpec;
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
+ };
+
+ if (stmtInsertUpsert.getReturnExpression() != null) {
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+ } else {
+ locker.lock();
+ try {
+ final JobSpecification jobSpec = compiler.compile();
+ if (jobSpec == null) {
+ return;
+ }
+ JobUtils.runJob(hcc, jobSpec, true);
+ } finally {
+ locker.unlock();
}
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
- dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
- query.getDatasets());
}
}
@@ -2515,69 +2532,124 @@
new StorageComponentProvider()));
}
- protected JobSpecification handleQuery(MetadataProvider metadataProvider, Query query,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
- throws Exception {
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
- try {
- JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null);
+ private interface IMetadataLocker {
+ void lock();
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
-
- if (query.isExplain()) {
- sessionConfig.out().flush();
- return jobSpec;
- } else if (sessionConfig.isExecuteQuery() && jobSpec != null) {
- handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
- }
- return jobSpec;
- } catch (Exception e) {
- LOGGER.log(Level.INFO, e.getMessage(), e);
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
- // release external datasets' locks acquired during compilation of the query
- ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
- }
+ void unlock();
}
- private void handleQueryResult(MetadataProvider metadataProvider, IHyracksClientConnection hcc,
- IHyracksDataset hdc, JobSpecification jobSpec, ResultDelivery resultDelivery, Stats stats)
- throws Exception {
- JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+ private interface IResultPrinter {
+ void print(JobId jobId) throws HyracksDataException, AlgebricksException;
+ }
- ResultHandle hand;
+ private interface IStatementCompiler {
+ JobSpecification compile() throws AlgebricksException, RemoteException, ACIDException;
+ }
+
+ protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
+ final IMetadataLocker locker = new IMetadataLocker() {
+ @Override
+ public void lock() {
+ MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
+ }
+
+ @Override
+ public void unlock() {
+ MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+ // release external datasets' locks acquired during compilation of the query
+ ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+ }
+ };
+ final IStatementCompiler compiler = () -> {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ final JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ return query.isExplain() || !sessionConfig.isExecuteQuery() ? null : jobSpec;
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, e.getMessage(), e);
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ }
+ };
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+ }
+
+ private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
+ MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats)
+ throws Exception {
+ final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
- hand = new ResultHandle(jobId, metadataProvider.getResultSetId());
- ResultUtil.printResultHandle(hand, sessionConfig);
- hcc.waitForCompletion(jobId);
- sessionConfig.out().flush();
+ MutableBoolean printed = new MutableBoolean(false);
+ executorService.submit(() -> {
+ JobId jobId = null;
+ try {
+ jobId = createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+ final ResultHandle handle = new ResultHandle(id, resultSetId);
+ ResultUtil.printResultHandle(handle, sessionConfig);
+ synchronized (printed) {
+ printed.setTrue();
+ printed.notify();
+ }
+ });
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
+ resultDelivery.name() + " job " + "with id " + jobId + " failed", e);
+ }
+ });
+ synchronized (printed) {
+ while (!printed.booleanValue()) {
+ printed.wait();
+ }
+ }
break;
case IMMEDIATE:
- hcc.waitForCompletion(jobId);
- ResultReader resultReader = new ResultReader(hdc);
- resultReader.open(jobId, metadataProvider.getResultSetId());
- ResultUtil.printResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+ createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+ final ResultReader resultReader = new ResultReader(hdc);
+ resultReader.open(id, resultSetId);
+ ResultUtil.printResults(resultReader, sessionConfig, stats,
+ metadataProvider.findOutputRecordType());
+ });
break;
case DEFERRED:
- hcc.waitForCompletion(jobId);
- hand = new ResultHandle(jobId, metadataProvider.getResultSetId());
- ResultUtil.printResultHandle(hand, sessionConfig);
- sessionConfig.out().flush();
+ createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+ ResultUtil.printResultHandle(new ResultHandle(id, resultSetId), sessionConfig);
+ });
break;
default:
break;
}
}
+ private static JobId createAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler,
+ IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer) throws Exception {
+ locker.lock();
+ try {
+ final JobSpecification jobSpec = compiler.compile();
+ if (jobSpec == null) {
+ return JobId.INVALID;
+ }
+ final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+ if (ResultDelivery.ASYNC == resultDelivery) {
+ printer.print(jobId);
+ hcc.waitForCompletion(jobId);
+ } else {
+ hcc.waitForCompletion(jobId);
+ printer.print(jobId);
+ }
+ return jobId;
+ } finally {
+ locker.unlock();
+ }
+ }
+
protected void handleCreateNodeGroupStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 92f3501..8ea4193 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -63,6 +63,7 @@
import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
@@ -164,7 +165,7 @@
webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" },
ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider));
+ getStatementExecutorFactory(), componentProvider));
return webServer;
}
@@ -228,35 +229,35 @@
switch (key) {
case AQL:
return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case AQL_QUERY:
return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case AQL_UPDATE:
return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case AQL_DDL:
return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case SQLPP:
return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case SQLPP_QUERY:
return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case SQLPP_UPDATE:
return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case SQLPP_DDL:
return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case QUERY_STATUS:
return new QueryStatusApiServlet(server.ctx(), paths);
case QUERY_RESULT:
return new QueryResultApiServlet(server.ctx(), paths);
case QUERY_SERVICE:
return new QueryServiceServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
- ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+ getStatementExecutorFactory(), componentProvider);
case CONNECTOR:
return new ConnectorApiServlet(server.ctx(), paths);
case SHUTDOWN:
@@ -276,6 +277,11 @@
}
}
+ private IStatementExecutorFactory getStatementExecutorFactory() {
+ return ccExtensionManager.getStatementExecutorFactory(
+ ((ClusterControllerService) appCtx.getControllerService()).getExecutorService());
+ }
+
@Override
public void startupCompleted() throws Exception {
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
index 29f8eab..e7ad205 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
@@ -22,6 +22,8 @@
* Date : 09/17/2013
*/
+//handlevariable=handle
+
use dataverse test;
for $i in dataset LineItem
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql
deleted file mode 100644
index 29f8eab..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
similarity index 97%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
index 29f8eab..e7ad205 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
@@ -22,6 +22,8 @@
* Date : 09/17/2013
*/
+//handlevariable=handle
+
use dataverse test;
for $i in dataset LineItem
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
index 29f8eab..5d59ca3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+//polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
index 29f8eab..89ef35e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
@@ -16,17 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#handlevariable=handle
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+set `import-private-functions` `true`;
+select value inject_failure(sleep("result", 5000), true);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
index 29f8eab..866b388 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#handlevariable=handle
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select value sleep("result", 3000);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
index 29f8eab..a44b911 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#handlevariable=handle
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
index 29f8eab..a44b911 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#handlevariable=handle
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
index 7bae0d4..df7826f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
@@ -22,8 +22,9 @@
* Date : 09/17/2013
*/
-use test;
+#handlevariable=handle
+use test;
select element {'partkey':gen0.partkey,'pid':p,'shipdate':j.l_shipdate}
from
@@ -34,5 +35,3 @@
gen0.i as j at p
where p < 4
order by partkey, shipdate;
-
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
similarity index 98%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
index cb47163..df7826f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
@@ -22,8 +22,9 @@
* Date : 09/17/2013
*/
-use test;
+#handlevariable=handle
+use test;
select element {'partkey':gen0.partkey,'pid':p,'shipdate':j.l_shipdate}
from
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
@@ -16,17 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
+#polltimeoutsecs=10
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
@@ -16,17 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Test for clause of the position variable in FLWOR expression
- * Expected Result : Success
- * Date : 09/17/2013
- */
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json
new file mode 100644
index 0000000..dd665eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json
@@ -0,0 +1 @@
+{"status":"FAILED"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json
new file mode 100644
index 0000000..6cffe65
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json
@@ -0,0 +1 @@
+{"status":"RUNNING"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json
new file mode 100644
index 0000000..859d906
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json
@@ -0,0 +1 @@
+"result"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.6.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.6.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast
deleted file mode 100644
index 17dc8b5..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast
+++ /dev/null
@@ -1,22 +0,0 @@
-DataverseUse test
-TypeDecl LineItemType [
- closed RecordType {
- l_orderkey : bigint,
- l_partkey : bigint,
- l_suppkey : bigint,
- l_linenumber : bigint,
- l_quantity : double,
- l_extendedprice : double,
- l_discount : double,
- l_tax : double,
- l_returnflag : string,
- l_linestatus : string,
- l_shipdate : string,
- l_commitdate : string,
- l_receiptdate : string,
- l_shipinstruct : string,
- l_shipmode : string,
- l_comment : string
- }
-]
-DatasetDecl LineItem(LineItemType) partitioned by [[l_orderkey], [l_linenumber]]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast
deleted file mode 100644
index 916a59e..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast
+++ /dev/null
@@ -1 +0,0 @@
-DataverseUse test
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- )
- (
- LiteralExpr [STRING] [pid]
- :
- Variable [ Name=$p ]
- )
- (
- LiteralExpr [STRING] [shipdate]
- :
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- )
-]
-]
-FROM [ (
- SELECT ELEMENT [
- RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- )
- (
- LiteralExpr [STRING] [i]
- :
- (
- SELECT ELEMENT [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- ]
- FROM [ Variable [ Name=$g ]
- AS Variable [ Name=$x ]
- ]
- Orderby
- FieldAccessor [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- Field=l_shipdate
- ]
- ASC
-
- )
- )
- ]
- ]
- FROM [ FunctionCall Metadata.dataset@1[
- LiteralExpr [STRING] [LineItem]
- ]
- AS Variable [ Name=$i ]
- ]
- Groupby
- Variable [ Name=$l_partkey ]
- :=
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- GROUP AS Variable [ Name=$g ]
- (
- i:=Variable [ Name=$i ]
- )
-
- )
- AS Variable [ Name=$gen0 ]
-,
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=i
- ]
- AS Variable [ Name=$j ]
- AT
-Variable [ Name=$p ]
-]
-Where
- OperatorExpr [
- Variable [ Name=$p ]
- <
- LiteralExpr [LONG] [4]
- ]
-Orderby
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- ASC
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- )
- (
- LiteralExpr [STRING] [pid]
- :
- Variable [ Name=$p ]
- )
- (
- LiteralExpr [STRING] [shipdate]
- :
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- )
-]
-]
-FROM [ (
- SELECT ELEMENT [
- RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- )
- (
- LiteralExpr [STRING] [i]
- :
- (
- SELECT ELEMENT [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- ]
- FROM [ Variable [ Name=$g ]
- AS Variable [ Name=$x ]
- ]
- Orderby
- FieldAccessor [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- Field=l_shipdate
- ]
- ASC
-
- )
- )
- ]
- ]
- FROM [ FunctionCall Metadata.dataset@1[
- LiteralExpr [STRING] [LineItem]
- ]
- AS Variable [ Name=$i ]
- ]
- Groupby
- Variable [ Name=$l_partkey ]
- :=
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- GROUP AS Variable [ Name=$g ]
- (
- i:=Variable [ Name=$i ]
- )
-
- )
- AS Variable [ Name=$gen0 ]
-,
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=i
- ]
- AS Variable [ Name=$j ]
- AT
-Variable [ Name=$p ]
-]
-Where
- OperatorExpr [
- Variable [ Name=$p ]
- <
- LiteralExpr [LONG] [4]
- ]
-Orderby
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- ASC
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- )
- (
- LiteralExpr [STRING] [pid]
- :
- Variable [ Name=$p ]
- )
- (
- LiteralExpr [STRING] [shipdate]
- :
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- )
-]
-]
-FROM [ (
- SELECT ELEMENT [
- RecordConstructor [
- (
- LiteralExpr [STRING] [partkey]
- :
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- )
- (
- LiteralExpr [STRING] [i]
- :
- (
- SELECT ELEMENT [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- ]
- FROM [ Variable [ Name=$g ]
- AS Variable [ Name=$x ]
- ]
- Orderby
- FieldAccessor [
- FieldAccessor [
- Variable [ Name=$x ]
- Field=i
- ]
- Field=l_shipdate
- ]
- ASC
-
- )
- )
- ]
- ]
- FROM [ FunctionCall Metadata.dataset@1[
- LiteralExpr [STRING] [LineItem]
- ]
- AS Variable [ Name=$i ]
- ]
- Groupby
- Variable [ Name=$l_partkey ]
- :=
- FieldAccessor [
- Variable [ Name=$i ]
- Field=l_partkey
- ]
- GROUP AS Variable [ Name=$g ]
- (
- i:=Variable [ Name=$i ]
- )
-
- )
- AS Variable [ Name=$gen0 ]
-,
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=i
- ]
- AS Variable [ Name=$j ]
- AT
-Variable [ Name=$p ]
-]
-Where
- OperatorExpr [
- Variable [ Name=$p ]
- <
- LiteralExpr [LONG] [4]
- ]
-Orderby
- FieldAccessor [
- Variable [ Name=$gen0 ]
- Field=partkey
- ]
- ASC
- FieldAccessor [
- Variable [ Name=$j ]
- Field=l_shipdate
- ]
- ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a76ecbc..0d8da65 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -21,6 +21,29 @@
]>
<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+ <test-group name="async-deferred">
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-failed">
+ <output-dir compare="Text">async-failed</output-dir>
+ <expected-error>Error in processing tuple 0</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="deferred">
+ <output-dir compare="Text">deferred</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async">
+ <output-dir compare="Text">async</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-running">
+ <output-dir compare="Text">async-running</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
index df7e5a0..fcd1a70 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
@@ -28,11 +28,6 @@
QueryFileExtension=".sqlpp">
<test-group name="flwor">
<test-case FilePath="flwor">
- <compilation-unit name="at00">
- <output-dir compare="AST">at00</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="flwor">
<compilation-unit name="at01">
<output-dir compare="AST">at01</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 9fb6a31..41f9e67 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -32,23 +32,11 @@
public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
- JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, waitForCompletion);
- return jobIds[0];
- }
-
- public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, boolean waitForCompletion)
- throws Exception {
- JobId[] startedJobIds = new JobId[jobs.length];
- for (int i = 0; i < jobs.length; i++) {
- JobSpecification spec = jobs[i].getJobSpec();
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- startedJobIds[i] = jobId;
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
- }
+ spec.setMaxReattempts(0);
+ final JobId jobId = hcc.startJob(spec);
+ if (waitForCompletion) {
+ hcc.waitForCompletion(jobId);
}
- return startedJobIds;
+ return jobId;
}
-
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index ff30df3..b5fbcf5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -32,12 +32,15 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Inet4Address;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -72,6 +75,7 @@
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.http.util.EntityUtils;
import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -91,12 +95,21 @@
private static final long MAX_URL_LENGTH = 2000l;
private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/",
Pattern.MULTILINE | Pattern.DOTALL);
+ private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("//.*$", Pattern.MULTILINE);
+ private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("#.*$", Pattern.MULTILINE);
private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$");
private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
Pattern.MULTILINE);
private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
+ private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)");
+ private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)");
+
public static final int TRUNCATE_THRESHOLD = 16384;
+ public static final String DELIVERY_ASYNC = "async";
+ public static final String DELIVERY_DEFERRED = "deferred";
+ public static final String DELIVERY_IMMEDIATE = "immediate";
+
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -376,8 +389,13 @@
// For tests where you simply want the byte-for-byte output.
private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception {
- if (!actualFile.getParentFile().mkdirs()) {
- LOGGER.warning("Unable to create actual file parent dir: " + actualFile.getParentFile());
+ final File parentDir = actualFile.getParentFile();
+ if (!parentDir.isDirectory()) {
+ if (parentDir.exists()) {
+ LOGGER.warning("Actual file parent \"" + parentDir + "\" exists but is not a directory");
+ } else if (!parentDir.mkdirs()) {
+ LOGGER.warning("Unable to create actual file parent dir: " + parentDir);
+ }
}
try (FileOutputStream out = new FileOutputStream(actualFile)) {
IOUtils.copy(resultStream, out);
@@ -424,38 +442,30 @@
return httpResponse;
}
- public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params)
+ public InputStream executeQuery(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params)
throws Exception {
- HttpUriRequest method = constructHttpMethod(str, url, "query", false, params);
+ HttpUriRequest method = constructHttpMethod(str, uri, "query", false, params);
// Set accepted output response type
method.setHeader("Accept", fmt.mimeType());
HttpResponse response = executeAndCheckHttpRequest(method);
return response.getEntity().getContent();
}
- public InputStream executeQueryService(String str, String url) throws Exception {
- return executeQueryService(str, OutputFormat.CLEAN_JSON, url, new ArrayList<>(), false);
+ public InputStream executeQueryService(String str, URI uri) throws Exception {
+ return executeQueryService(str, OutputFormat.CLEAN_JSON, uri, new ArrayList<>(), false);
}
- public InputStream executeQueryService(String str, OutputFormat fmt, String url,
+ public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
setParam(params, "format", fmt.mimeType());
- HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params)
- : constructPostMethodUrl(str, url, "statement", params);
+ HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", params)
+ : constructPostMethodUrl(str, uri, "statement", params);
// Set accepted output response type
method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType());
HttpResponse response = executeHttpRequest(method);
return response.getEntity().getContent();
}
- public InputStream executeQueryService(String statement, OutputFormat fmt, String url,
- List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception {
- setParam(params, "mode", deferred);
- InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded);
- String handle = ResultExtractor.extractHandle(resultStream);
- return getHandleResult(handle, fmt);
- }
-
protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) {
for (CompilationUnit.Parameter param : params) {
if (name.equals(param.getName())) {
@@ -479,19 +489,19 @@
return params;
}
- private HttpUriRequest constructHttpMethod(String statement, String endpoint, String stmtParam,
- boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) {
- if (statement.length() + endpoint.length() < MAX_URL_LENGTH) {
+ private HttpUriRequest constructHttpMethod(String statement, URI uri, String stmtParam,
+ boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) throws URISyntaxException {
+ if (statement.length() + uri.toString().length() < MAX_URL_LENGTH) {
// Use GET for small-ish queries
- return constructGetMethod(endpoint, injectStatement(statement, stmtParam, otherParams));
+ return constructGetMethod(uri, injectStatement(statement, stmtParam, otherParams));
} else {
// Use POST for bigger ones to avoid 413 FULL_HEAD
String stmtParamName = (postStmtAsParam ? stmtParam : null);
- return constructPostMethodUrl(statement, endpoint, stmtParamName, otherParams);
+ return constructPostMethodUrl(statement, uri, stmtParamName, otherParams);
}
}
- private HttpUriRequest constructGetMethod(String endpoint, List<CompilationUnit.Parameter> params) {
+ private HttpUriRequest constructGetMethod(URI endpoint, List<CompilationUnit.Parameter> params) {
RequestBuilder builder = RequestBuilder.get(endpoint);
for (CompilationUnit.Parameter param : params) {
builder.addParameter(param.getName(), param.getValue());
@@ -500,17 +510,16 @@
return builder.build();
}
- private HttpUriRequest constructGetMethod(String endpoint, OutputFormat fmt,
+ private HttpUriRequest constructGetMethod(URI endpoint, OutputFormat fmt,
List<CompilationUnit.Parameter> params) {
-
HttpUriRequest method = constructGetMethod(endpoint, params);
// Set accepted output response type
method.setHeader("Accept", fmt.mimeType());
return method;
}
- private HttpUriRequest constructPostMethod(String endpoint, List<CompilationUnit.Parameter> params) {
- RequestBuilder builder = RequestBuilder.post(endpoint);
+ private HttpUriRequest constructPostMethod(URI uri, List<CompilationUnit.Parameter> params) {
+ RequestBuilder builder = RequestBuilder.post(uri);
for (CompilationUnit.Parameter param : params) {
builder.addParameter(param.getName(), param.getValue());
}
@@ -518,18 +527,17 @@
return builder.build();
}
- private HttpUriRequest constructPostMethod(String endpoint, OutputFormat fmt,
+ private HttpUriRequest constructPostMethod(URI uri, OutputFormat fmt,
List<CompilationUnit.Parameter> params) {
-
- HttpUriRequest method = constructPostMethod(endpoint, params);
+ HttpUriRequest method = constructPostMethod(uri, params);
// Set accepted output response type
method.setHeader("Accept", fmt.mimeType());
return method;
}
- protected HttpUriRequest constructPostMethodUrl(String statement, String endpoint, String stmtParam,
+ protected HttpUriRequest constructPostMethodUrl(String statement, URI uri, String stmtParam,
List<CompilationUnit.Parameter> otherParams) {
- RequestBuilder builder = RequestBuilder.post(endpoint);
+ RequestBuilder builder = RequestBuilder.post(uri);
if (stmtParam != null) {
for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
builder.addParameter(param.getName(), param.getValue());
@@ -543,12 +551,12 @@
return builder.build();
}
- protected HttpUriRequest constructPostMethodJson(String statement, String endpoint, String stmtParam,
+ protected HttpUriRequest constructPostMethodJson(String statement, URI uri, String stmtParam,
List<CompilationUnit.Parameter> otherParams) {
if (stmtParam == null) {
throw new NullPointerException("Statement parameter required.");
}
- RequestBuilder builder = RequestBuilder.post(endpoint);
+ RequestBuilder builder = RequestBuilder.post(uri);
ObjectMapper om = new ObjectMapper();
ObjectNode content = om.createObjectNode();
for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
@@ -563,23 +571,23 @@
return builder.build();
}
- public InputStream executeJSONGet(OutputFormat fmt, String url) throws Exception {
- HttpUriRequest request = constructGetMethod(url, fmt, new ArrayList<>());
+ public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception {
+ HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>());
HttpResponse response = executeAndCheckHttpRequest(request);
return response.getEntity().getContent();
}
- public InputStream executeJSONPost(OutputFormat fmt, String url) throws Exception {
- HttpUriRequest request = constructPostMethod(url, fmt, new ArrayList<>());
+ public InputStream executeJSONPost(OutputFormat fmt, URI uri) throws Exception {
+ HttpUriRequest request = constructPostMethod(uri, fmt, new ArrayList<>());
HttpResponse response = executeAndCheckHttpRequest(request);
return response.getEntity().getContent();
}
// To execute Update statements
// Insert and Delete statements are executed here
- public void executeUpdate(String str, String url) throws Exception {
+ public void executeUpdate(String str, URI uri) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+ HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
.build();
// Execute the method.
@@ -587,30 +595,25 @@
}
// Executes AQL in either async or async-defer mode.
- public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception {
+ public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri,
+ Map<String, Object> variableCtx) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(url)
+ HttpUriRequest request = RequestBuilder.post(uri)
.addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
- .setEntity(new StringEntity(str, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()).build();
+ .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
+ .build();
+
+ String handleVar = getHandleVariable(statement);
HttpResponse response = executeAndCheckHttpRequest(request);
InputStream resultStream = response.getEntity().getContent();
+ String handle = IOUtils.toString(resultStream, "UTF-8");
- String theHandle = IOUtils.toString(resultStream, "UTF-8");
-
- // take the handle and parse it so results can be retrieved
- return getHandleResult(theHandle, fmt);
- }
-
- private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
- final String url = getEndpoint(Lets.QUERY_RESULT);
-
- // Create a method instance.
- HttpUriRequest request = RequestBuilder.get(url).addParameter("handle", handle)
- .setHeader("Accept", fmt.mimeType()).build();
-
- HttpResponse response = executeAndCheckHttpRequest(request);
- return response.getEntity().getContent();
+ if (handleVar != null) {
+ variableCtx.put(handleVar, handle);
+ return resultStream;
+ }
+ return null;
}
// To execute DDL and Update statements
@@ -619,9 +622,9 @@
// create index statement
// create dataverse statement
// create function statement
- public void executeDDL(String str, String url) throws Exception {
+ public void executeDDL(String str, URI uri) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+ HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
.build();
// Execute the method.
@@ -735,9 +738,10 @@
executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
}
- public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
- boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
- List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
+ public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+ String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
+ MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
+ throws Exception {
File qbcFile;
boolean failed = false;
File expectedResultFile;
@@ -762,17 +766,11 @@
ResultExtractor.extract(resultStream);
}
break;
+ case "pollget":
case "pollquery":
// polltimeoutsecs=nnn, polldelaysecs=nnn
- final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
- int timeoutSecs;
- if (timeoutMatcher.find()) {
- timeoutSecs = Integer.parseInt(timeoutMatcher.group(1));
- } else {
- throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
- }
- final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
- int retryDelaySecs = retryDelayMatcher.find() ? Integer.parseInt(timeoutMatcher.group(1)) : 1;
+ int timeoutSecs = getTimeoutSecs(statement);
+ int retryDelaySecs = getRetryDelaySecs(statement);
long startTime = System.currentTimeMillis();
long limitTime = startTime + TimeUnit.SECONDS.toMillis(timeoutSecs);
ctx.setType(ctx.getType().substring("poll".length()));
@@ -780,7 +778,7 @@
LOGGER.fine("polling for up to " + timeoutSecs + " seconds w/ " + retryDelaySecs + " second(s) delay");
while (true) {
try {
- executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+ executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
expectedResultFileCtxs, testFile, actualPath);
finalException = null;
break;
@@ -800,7 +798,7 @@
break;
case "query":
case "async":
- case "asyncdefer":
+ case "deferred":
// isDmlRecoveryTest: insert Crash and Recovery
if (isDmlRecoveryTest) {
executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -810,31 +808,43 @@
}
InputStream resultStream = null;
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
+ final String reqType = ctx.getType();
+ final List<CompilationUnit.Parameter> params = cUnit.getParameter();
if (ctx.getFile().getName().endsWith("aql")) {
- if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY),
- cUnit.getParameter());
- } else if (ctx.getType().equalsIgnoreCase("async")) {
- resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Lets.AQL));
- } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
- resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Lets.AQL));
+ if (reqType.equalsIgnoreCase("query")) {
+ resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY), params);
+ } else {
+ final URI endpoint = getEndpoint(Lets.AQL);
+ if (reqType.equalsIgnoreCase("async")) {
+ resultStream = executeAnyAQLAsync(statement, false, fmt, endpoint, variableCtx);
+ } else if (reqType.equalsIgnoreCase("deferred")) {
+ resultStream = executeAnyAQLAsync(statement, true, fmt, endpoint, variableCtx);
+ }
+ Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), resultStream);
}
} else {
- final String reqType = ctx.getType();
- final String url = getEndpoint(Lets.QUERY_SERVICE);
- final List<CompilationUnit.Parameter> params = cUnit.getParameter();
- if (reqType.equalsIgnoreCase("query")) {
- resultStream = executeQueryService(statement, fmt, url, params, true);
+ String delivery = DELIVERY_IMMEDIATE;
+ if (reqType.equalsIgnoreCase("async")) {
+ delivery = DELIVERY_ASYNC;
+ } else if (reqType.equalsIgnoreCase("deferred")) {
+ delivery = DELIVERY_DEFERRED;
+ }
+ final URI uri = getEndpoint(Lets.QUERY_SERVICE);
+ if (DELIVERY_IMMEDIATE.equals(delivery)) {
+ resultStream = executeQueryService(statement, fmt, uri, params, true);
resultStream = ResultExtractor.extract(resultStream);
- } else if (reqType.equalsIgnoreCase("async")) {
- resultStream = executeQueryService(statement, fmt, url, params, true, "async");
- } else if (reqType.equalsIgnoreCase("asyncdefer")) {
- resultStream = executeQueryService(statement, fmt, url, params, true, "deferred");
+ } else {
+ String handleVar = getHandleVariable(statement);
+ setParam(params, "mode", delivery);
+ resultStream = executeQueryService(statement, fmt, uri, params, true);
+ String handle = ResultExtractor.extractHandle(resultStream);
+ Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), handleVar);
+ variableCtx.put(handleVar, handle);
}
}
if (queryCount.intValue() >= expectedResultFileCtxs.size()) {
- throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
- + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
+ Assert.fail("no result file for " + testFile.toString() + "; queryCount: " + queryCount
+ + ", filectxs.size: " + expectedResultFileCtxs.size());
}
expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
@@ -938,17 +948,9 @@
"Unexpected format for method " + ctx.getType() + ": " + ctx.extension());
}
fmt = OutputFormat.forCompilationUnit(cUnit);
- String endpoint = stripJavaComments(statement).trim();
- switch (ctx.getType()) {
- case "get":
- resultStream = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
- break;
- case "post":
- resultStream = executeJSONPost(fmt, "http://" + host + ":" + port + endpoint);
- break;
- default:
- throw new IllegalStateException("NYI: " + ctx.getType());
- }
+ final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim();
+ final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx);
+ resultStream = executeHttp(ctx.getType(), variablesReplaced, fmt);
expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
actualResultFile = testCaseCtx.getActualResultFile(cUnit, expectedResultFile, new File(actualPath));
writeOutputToFile(actualResultFile, resultStream);
@@ -1047,11 +1049,56 @@
}
}
+ protected int getTimeoutSecs(String statement) {
+ final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
+ if (timeoutMatcher.find()) {
+ return Integer.parseInt(timeoutMatcher.group(1));
+ } else {
+ throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
+ }
+ }
+
+ protected static int getRetryDelaySecs(String statement) {
+ final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
+ return retryDelayMatcher.find() ? Integer.parseInt(retryDelayMatcher.group(1)) : 1;
+ }
+
+ protected static String getHandleVariable(String statement) {
+ final Matcher handleVariableMatcher = HANDLE_VARIABLE_PATTERN.matcher(statement);
+ return handleVariableMatcher.find() ? handleVariableMatcher.group(1) : null;
+ }
+
+ protected static String replaceVarRef(String statement, Map<String, Object> variableCtx) {
+ String tmpStmt = statement;
+ Matcher variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+ while (variableReferenceMatcher.find()) {
+ String var = variableReferenceMatcher.group(1);
+ Object value = variableCtx.get(var);
+ Assert.assertNotNull("No value for variable reference $" + var, value);
+ tmpStmt = tmpStmt.replace("$" + var, String.valueOf(value));
+ variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+ }
+ return tmpStmt;
+ }
+
+ protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception {
+ String[] split = endpoint.split("\\?");
+ URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null);
+ switch (ctxType) {
+ case "get":
+ return executeJSONGet(fmt, uri);
+ case "post":
+ return executeJSONPost(fmt, uri);
+ default:
+ throw new AssertionError("Not implemented: " + ctxType);
+ }
+ }
+
private void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
//get node process id
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
String endpoint = "/admin/cluster/node/" + nodeId + "/config";
- InputStream executeJSONGet = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
+ InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null));
StringWriter actual = new StringWriter();
IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
String config = actual.toString();
@@ -1065,10 +1112,6 @@
public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
- File testFile;
- String statement;
- List<TestFileContext> expectedResultFileCtxs;
- List<TestFileContext> testFileCtxs;
MutableInt queryCount = new MutableInt(0);
int numOfErrors = 0;
int numOfFiles = 0;
@@ -1076,14 +1119,15 @@
for (CompilationUnit cUnit : cUnits) {
LOGGER.info(
"Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
- testFileCtxs = testCaseCtx.getTestFiles(cUnit);
- expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
+ Map<String, Object> variableCtx = new HashMap<>();
+ List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit);
+ List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
for (TestFileContext ctx : testFileCtxs) {
numOfFiles++;
- testFile = ctx.getFile();
- statement = readTestFile(testFile);
+ final File testFile = ctx.getFile();
+ final String statement = readTestFile(testFile);
try {
- executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+ executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
expectedResultFileCtxs, testFile, actualPath);
} catch (Exception e) {
System.err.println("testFile " + testFile.toString() + " raised an exception: " + e);
@@ -1140,14 +1184,19 @@
return servlet.getPath();
}
- protected String getEndpoint(Lets servlet) {
- return "http://" + host + ":" + port + getPath(servlet).replaceAll("/\\*$", "");
+ protected URI getEndpoint(Lets servlet) throws URISyntaxException {
+ return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null);
}
public static String stripJavaComments(String text) {
return JAVA_BLOCK_COMMENT_PATTERN.matcher(text).replaceAll("");
}
+ public static String stripLineComments(String text) {
+ final String s = SHELL_LINE_COMMENT_PATTERN.matcher(text).replaceAll("");
+ return JAVA_LINE_COMMENT_PATTERN.matcher(s).replaceAll("");
+ }
+
public void cleanup(String testCase, List<String> badtestcases) throws Exception {
try {
ArrayList<String> toBeDropped = new ArrayList<>();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
index 50d103a..2eada38 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
@@ -29,7 +29,7 @@
@RunWith(Parameterized.class)
public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
- @Parameters
+ @Parameters(name = "ManagixSqlppExecutionIT {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml");
if (testArgs.size() == 0) {
@@ -48,10 +48,7 @@
}
- private TestCaseContext tcCtx;
-
public ManagixSqlppExecutionIT(TestCaseContext tcCtx) {
super(tcCtx);
- this.tcCtx = tcCtx;
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 6eaf4ae..0769596 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -92,6 +92,7 @@
import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
import org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
+import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
@@ -651,6 +652,8 @@
"spatial-cell", 4);
public static final FunctionIdentifier SWITCH_CASE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"switch-case", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier SLEEP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sleep", 2);
public static final FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"inject-failure", 2);
public static final FunctionIdentifier FLOW_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1058,6 +1061,7 @@
addPrivateFunction(SUBSET_COLLECTION, SubsetCollectionTypeComputer.INSTANCE, true);
addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true);
addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true);
+ addFunction(SLEEP, SleepTypeComputer.INSTANCE, false);
addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE, true);
addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
index df050be..cc19ac4 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
@@ -21,29 +21,31 @@
import org.apache.asterix.om.exceptions.TypeMismatchException;
import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+/**
+ * The first argument of INJECT_FAILURE can be any data model instance and will be passed verbatim to the
+ * caller. The second argument is a boolean that determines if the invocation throws an exception.
+ *
+ * Consequently {@link #checkArgType(String, int, IAType)} validates that the second argument is a
+ * boolean and {@link #getResultType(ILogicalExpression, IAType...)} returns the type of the first
+ * argument.
+ */
public class InjectFailureTypeComputer extends AbstractResultTypeComputer {
public static final InjectFailureTypeComputer INSTANCE = new InjectFailureTypeComputer();
- protected InjectFailureTypeComputer() {
- }
-
@Override
protected void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
- ATypeTag actualTypeTag = type.getTypeTag();
- if (actualTypeTag != ATypeTag.BOOLEAN) {
- throw new TypeMismatchException(funcName, argIndex, actualTypeTag, ATypeTag.BOOLEAN);
+ if (argIndex == 1 && type.getTypeTag() != ATypeTag.BOOLEAN) {
+ throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.BOOLEAN);
}
}
@Override
protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
- return BuiltinType.ABOOLEAN;
+ return strippedInputTypes[0];
}
-
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
new file mode 100644
index 0000000..6b885e3
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class SleepTypeComputer extends AbstractResultTypeComputer {
+ public static final SleepTypeComputer INSTANCE = new SleepTypeComputer();
+
+ @Override
+ public void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
+ if (argIndex == 1) {
+ switch (type.getTypeTag()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ break;
+ default:
+ throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.INT8,
+ ATypeTag.INT16, ATypeTag.INT32, ATypeTag.INT64);
+ }
+ }
+ }
+
+ @Override
+ public IAType getResultType(ILogicalExpression expr, IAType... types) throws AlgebricksException {
+ return types[0];
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
index 164f369..af5f690 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.runtime.evaluators.functions;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
@@ -39,6 +42,9 @@
public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
@@ -75,6 +81,7 @@
boolean argResult = ABooleanSerializerDeserializer.getBoolean(argPtr.getByteArray(),
argPtr.getStartOffset() + 1);
if (argResult) {
+ LOGGER.log(Level.SEVERE, ctx.getTaskAttemptId() + " injecting failure");
throw new RuntimeDataException(ErrorCode.INJECTED_FAILURE, getIdentifier());
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
new file mode 100644
index 0000000..a186b32
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
+ public static final IFunctionDescriptorFactory FACTORY = SleepDescriptor::new;
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
+ return new IScalarEvaluator() {
+
+ private IPointable argTime = new VoidPointable();
+ private final IScalarEvaluator evalValue = args[0].createScalarEvaluator(ctx);
+ private final IScalarEvaluator evalTime = args[1].createScalarEvaluator(ctx);
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+ evalValue.evaluate(tuple, result);
+ evalTime.evaluate(tuple, argTime);
+
+ final byte[] bytes = argTime.getByteArray();
+ final int offset = argTime.getStartOffset();
+ final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
+
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " sleeping for " + time + " ms");
+ }
+ Thread.sleep(time);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " done sleeping for " + time + " ms");
+ }
+ }
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SLEEP;
+ }
+
+}
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index c926ec0..175ecc4 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.net.URI;
import java.net.URL;
import java.util.Collections;
@@ -89,7 +90,8 @@
public void test1_sanityQuery() throws Exception {
TestExecutor testExecutor = new TestExecutor();
InputStream resultStream = testExecutor.executeQuery("1+1", OutputFormat.ADM,
- "http://127.0.0.1:19002" + Lets.AQL_QUERY.getPath(), Collections.emptyList());
+ new URI("http", null, "127.0.0.1", 19002, Lets.AQL_QUERY.getPath(), null, null),
+ Collections.emptyList());
StringWriter sw = new StringWriter();
IOUtils.copy(resultStream, sw);
Assert.assertEquals("2", sw.toString().trim());
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
index e41a624..7a53a9e 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
@@ -59,10 +59,16 @@
@Override
public int compareTo(TestFileContext o) {
- if (this.seqNum > o.seqNum)
+ if (this.seqNum > o.seqNum) {
return 1;
- else if (this.seqNum < o.seqNum)
+ } else if (this.seqNum < o.seqNum) {
return -1;
+ }
return 0;
}
+
+ @Override
+ public String toString() {
+ return String.valueOf(seqNum) + ":" + type + ":" + file;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index a50e1ee..3165840 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -71,17 +71,24 @@
}
public void start() {
- status = Status.RUNNING;
+ updateStatus(Status.RUNNING);
}
public void writeEOS() {
- status = Status.SUCCESS;
+ updateStatus(Status.SUCCESS);
}
public void fail() {
status = Status.FAILED;
}
+ private void updateStatus(final DatasetDirectoryRecord.Status newStatus) {
+ // FAILED is a stable status
+ if (status != Status.FAILED) {
+ status = newStatus;
+ }
+ }
+
public Status getStatus() {
return status;
}
@@ -99,6 +106,6 @@
@Override
public String toString() {
- return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
+ return String.valueOf(address) + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 34ed65c..f29ff4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -20,9 +20,14 @@
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DatasetJobRecord implements IDatasetStateRecord {
public enum Status {
+ IDLE,
RUNNING,
SUCCESS,
FAILED
@@ -36,20 +41,30 @@
private List<Exception> exceptions;
+ private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
+
public DatasetJobRecord() {
this.timestamp = System.currentTimeMillis();
- this.status = Status.RUNNING;
+ this.status = Status.IDLE;
+ }
+
+ private void updateStatus(Status newStatus) {
+ // FAILED is a stable status
+ if (status != Status.FAILED) {
+ status = newStatus;
+ }
}
public void start() {
- status = Status.RUNNING;
+ updateStatus(Status.RUNNING);
}
public void success() {
- status = Status.SUCCESS;
+ updateStatus(Status.SUCCESS);
}
- public void fail() {
+ public void fail(ResultSetId rsId, int partition) {
+ getOrCreateDirectoryRecord(rsId, partition).fail();
status = Status.FAILED;
}
@@ -58,6 +73,7 @@
this.exceptions = exceptions;
}
+ @Override
public long getTimestamp() {
return timestamp;
}
@@ -66,7 +82,57 @@
return status;
}
+ @Override
+ public String toString() {
+ return resultSetMetadataMap.toString();
+ }
+
public List<Exception> getExceptions() {
return exceptions;
}
+
+ public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions) throws
+ HyracksDataException {
+ ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
+ if (rsMd == null) {
+ resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult));
+ } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) {
+ throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
+ }
+ //TODO(tillw) throwing a HyracksDataException here hangs the execution tests
+ }
+
+ public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
+ return resultSetMetadataMap.get(rsId);
+ }
+
+ public synchronized DatasetDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
+ DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ if (records[partition] == null) {
+ records[partition] = new DatasetDirectoryRecord();
+ }
+ return records[partition];
+ }
+
+ public synchronized DatasetDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) throws
+ HyracksDataException {
+ DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ if (records[partition] == null) {
+ throw new HyracksDataException("no record for partition " + partition + " of result set " + rsId);
+ }
+ return records[partition];
+ }
+
+ public synchronized void updateStatus(ResultSetId rsId) {
+ int successCount = 0;
+ DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ for (DatasetDirectoryRecord record : records) {
+ if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
+ successCount++;
+ }
+ }
+ if (successCount == records.length) {
+ success();
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
index 2285981..8e9e3dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
@@ -18,14 +18,15 @@
*/
package org.apache.hyracks.api.dataset;
+import java.util.Arrays;
+
public class ResultSetMetaData {
+ private final DatasetDirectoryRecord[] records;
private final boolean ordered;
- private final DatasetDirectoryRecord[] records;
-
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+ ResultSetMetaData(int len, boolean ordered) {
+ this.records = new DatasetDirectoryRecord[len];
this.ordered = ordered;
- this.records = records;
}
public boolean getOrderedResult() {
@@ -35,4 +36,11 @@
public DatasetDirectoryRecord[] getRecords() {
return records;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records));
+ return sb.toString();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index d094368..35002f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -49,6 +49,10 @@
public static final int ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT = 12;
public static final int DUPLICATE_IODEVICE = 13;
public static final int NESTED_IODEVICES = 14;
+ public static final int MORE_THAN_ONE_RESULT = 15;
+ public static final int RESULT_FAILURE_EXCEPTION = 16;
+ public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
+ public static final int INCONSISTENT_RESULT_METADATA = 18;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 0fd6923..404104d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -36,6 +36,11 @@
return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
}
+ public static HyracksDataException create(HyracksDataException e, String nodeId) {
+ return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e
+ .getParams());
+ }
+
public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
super(component, errorCode, message, cause, nodeId, params);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index b1fa494..7969700 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -26,6 +26,9 @@
import org.apache.hyracks.api.io.IWritable;
public final class JobId implements IWritable, Serializable {
+
+ public static final JobId INVALID = new JobId(-1l);
+
private static final long serialVersionUID = 1L;
private long id;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 72f7c65..de58f33 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -33,5 +33,9 @@
12 = Invalid attempt to write to a flushed append only metadata page
13 = Duplicate IODevices are not allowed
14 = IODevices should not be nested within each other
+15 = More than 1 result for job %1$s
+16 = Failure producing result set %1$s for job %2$s
+17 = No exception for failed result set %1$s for job %2$s
+18 = Inconsistent metadata for result set %1$s"
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ae0f361..37c4177 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -325,10 +325,14 @@
return workQueue;
}
- public Executor getExecutor() {
+ public ExecutorService getExecutorService() {
return executor;
}
+ public Executor getExecutor() {
+ return getExecutorService();
+ }
+
public CCConfig getConfig() {
return ccConfig;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 4d7d1c3..46a173e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -33,6 +35,7 @@
import org.apache.hyracks.api.dataset.IDatasetStateRecord;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.dataset.ResultSetMetaData;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -48,6 +51,9 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
+
+ private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
private final long resultTTL;
private final long resultSweepThreshold;
@@ -62,22 +68,24 @@
@Override
public void init(ExecutorService executor) {
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
}
@Override
public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
throws HyracksException {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- if (djr == null) {
- djr = new DatasetJobRecord();
- jobResultLocations.put(jobId, new JobResultInfo(djr, null));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId);
}
+ if (jobResultLocations.get(jobId) != null) {
+ throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
+ }
+ jobResultLocations.put(jobId, new JobResultInfo(new DatasetJobRecord(), null));
}
@Override
public void notifyJobStart(JobId jobId) throws HyracksException {
- // Auto-generated method stub
+ jobResultLocations.get(jobId).getRecord().start();
}
@Override
@@ -87,35 +95,36 @@
private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
final JobResultInfo jri = jobResultLocations.get(jobId);
- return jri == null ? null : jri.record;
+ return jri == null ? null : jri.getRecord();
+ }
+
+ private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) {
+ final DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ if (djr == null) {
+ throw new NullPointerException();
+ }
+ return djr;
}
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws
+ HyracksDataException {
+ DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
+ DatasetDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
- ResultSetMetaData resultSetMetaData = djr.get(rsId);
- if (resultSetMetaData == null) {
- resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
- djr.put(rsId, resultSetMetaData);
- }
+ record.setNetworkAddress(networkAddress);
+ record.setEmpty(emptyResult);
+ record.start();
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- if (records[partition] == null) {
- records[partition] = new DatasetDirectoryRecord();
- }
- records[partition].setNetworkAddress(networkAddress);
- records[partition].setEmpty(emptyResult);
- records[partition].start();
-
- Waiters waiters = jobResultLocations.get(jobId).waiters;
- Waiter waiter = waiters != null ? waiters.get(rsId) : null;
+ final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+ Waiter waiter = jobResultInfo.getWaiter(rsId);
if (waiter != null) {
try {
DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
if (updatedRecords != null) {
- waiters.remove(rsId);
+ jobResultInfo.removeWaiter(rsId);
waiter.callback.setValue(updatedRecords);
}
} catch (Exception e) {
@@ -126,51 +135,28 @@
}
@Override
- public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
- int successCount = 0;
-
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- ResultSetMetaData resultSetMetaData = djr.get(rsId);
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- records[partition].writeEOS();
-
- for (DatasetDirectoryRecord record : records) {
- if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
- successCount++;
- }
- }
- if (successCount == records.length) {
- djr.success();
- }
+ public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+ throws HyracksDataException {
+ DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ djr.getDirectoryRecord(rsId, partition).writeEOS();
+ djr.updateStatus(rsId);
notifyAll();
}
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- if (djr != null) {
- djr.fail();
- }
- final Waiters waiters = jobResultLocations.get(jobId).waiters;
- if (waiters != null) {
- waiters.get(rsId).callback.setException(new Exception());
- waiters.remove(rsId);
- }
+ DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ djr.fail(rsId, partition);
+ jobResultLocations.get(jobId).setException(new Exception());
notifyAll();
}
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- if (djr != null) {
- djr.fail(exceptions);
- }
- final Waiters waiters = jobResultLocations.get(jobId).waiters;
- if (waiters != null) {
- for (ResultSetId rsId : waiters.keySet()) {
- waiters.remove(rsId).callback.setException(exceptions.get(0));
- }
- }
+ DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ djr.fail(exceptions);
+ // TODO(tillw) throwing an NPE here hangs the system, why?
+ jobResultLocations.get(jobId).setException(exceptions.isEmpty() ? null : exceptions.get(0));
notifyAll();
}
@@ -184,7 +170,6 @@
throw new HyracksDataException(e);
}
}
-
return djr.getStatus();
}
@@ -195,7 +180,7 @@
@Override
public IDatasetStateRecord getState(JobId jobId) {
- return jobResultLocations.get(jobId).record;
+ return getDatasetJobRecord(jobId);
}
@Override
@@ -210,20 +195,7 @@
throws HyracksDataException {
DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
if (updatedRecords == null) {
- JobResultInfo jri = jobResultLocations.get(jobId);
- Waiters waiters;
- if (jri == null) {
- waiters = new Waiters();
- jri = new JobResultInfo(null, waiters);
- jobResultLocations.put(jobId, jri);
- } else {
- waiters = jri.waiters;
- if (waiters == null) {
- waiters = new Waiters();
- jri.waiters = waiters;
- }
- }
- waiters.put(rsId, new Waiter(knownRecords, callback));
+ jobResultLocations.get(jobId).addWaiter(rsId, knownRecords, callback);
} else {
callback.setValue(updatedRecords);
}
@@ -248,26 +220,25 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
-
- if (djr == null) {
- throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
- }
+ DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
if (djr.getStatus() == Status.FAILED) {
List<Exception> caughtExceptions = djr.getExceptions();
- if (caughtExceptions == null) {
- throw new HyracksDataException("Job failed.");
+ if (caughtExceptions != null && !caughtExceptions.isEmpty()) {
+ final Exception cause = caughtExceptions.get(caughtExceptions.size() - 1);
+ if (cause instanceof HyracksDataException) {
+ throw (HyracksDataException) cause;
+ }
+ throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_EXCEPTION, cause, rsId, jobId);
} else {
- throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+ throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId);
}
}
- ResultSetMetaData resultSetMetaData = djr.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId);
+ if (resultSetMetaData == null) {
return null;
}
-
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
return Arrays.equals(records, knownRecords) ? null : records;
@@ -275,13 +246,42 @@
}
class JobResultInfo {
+
+ private DatasetJobRecord record;
+ private Waiters waiters;
+
JobResultInfo(DatasetJobRecord record, Waiters waiters) {
this.record = record;
this.waiters = waiters;
}
- DatasetJobRecord record;
- Waiters waiters;
+ DatasetJobRecord getRecord() {
+ return record;
+ }
+
+ void addWaiter(ResultSetId rsId, DatasetDirectoryRecord[] knownRecords,
+ IResultCallback<DatasetDirectoryRecord[]> callback) {
+ if (waiters == null) {
+ waiters = new Waiters();
+ }
+ waiters.put(rsId, new Waiter(knownRecords, callback));
+ }
+
+ Waiter removeWaiter(ResultSetId rsId) {
+ return waiters.remove(rsId);
+ }
+
+ Waiter getWaiter(ResultSetId rsId) {
+ return waiters != null ? waiters.get(rsId) : null;
+ }
+
+ void setException(Exception exception) {
+ if (waiters != null) {
+ for (ResultSetId rsId : waiters.keySet()) {
+ waiters.remove(rsId).callback.setException(exception);
+ }
+ }
+ }
}
class Waiters extends HashMap<ResultSetId, Waiter> {
@@ -289,11 +289,11 @@
}
class Waiter {
+ DatasetDirectoryRecord[] knownRecords;
+ IResultCallback<DatasetDirectoryRecord[]> callback;
+
Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
this.knownRecords = knownRecords;
this.callback = callback;
}
-
- DatasetDirectoryRecord[] knownRecords;
- IResultCallback<DatasetDirectoryRecord[]> callback;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 9e4e03e..663a53a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -35,9 +35,11 @@
public void init(ExecutorService executor);
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress);
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
+ throws HyracksDataException;
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+ throws HyracksDataException;
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4e4732d..f51dd06 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,8 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
+ networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
@@ -55,8 +57,13 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
- partition, nPartitions, networkAddress);
+ try {
+ ccs.getDatasetDirectoryService()
+ .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
+ networkAddress);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index ffae76a..d63bc8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc.work;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,11 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ try {
+ ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index 150875b..67b87f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -31,7 +31,6 @@
* Sweeper to clean up the stale result distribution files and result states.
*/
public class ResultStateSweeper implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
private final IDatasetManager datasetManager;
@@ -39,12 +38,16 @@
private final long resultSweepThreshold;
+ private final Logger logger;
+
private final List<JobId> toBeCollected;
- public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+ public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
+ Logger logger) {
this.datasetManager = datasetManager;
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
+ this.logger = logger;
toBeCollected = new ArrayList<JobId>();
}
@@ -56,11 +59,10 @@
Thread.sleep(resultSweepThreshold);
sweep();
} catch (InterruptedException e) {
- LOGGER.severe("Result cleaner thread interrupted, shutting down.");
+ logger.log(Level.SEVERE, "Result cleaner thread interrupted, shutting down.", e);
break; // the interrupt was explicit from another thread. This thread should shut down...
}
}
-
}
private void sweep() {
@@ -75,8 +77,8 @@
datasetManager.deinitState(jobId);
}
}
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Result state cleanup instance successfully completed.");
+ if (logger.isLoggable(Level.FINER)) {
+ logger.finer("Result state cleanup instance successfully completed.");
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
index 73c680f..3bc549e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
@@ -58,8 +58,12 @@
public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
List<Exception> newExceptions = new ArrayList<>();
for (Exception e : exceptions) {
- newExceptions.add(
- new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), e, nodeId));
+ if (e instanceof HyracksDataException) {
+ newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId));
+ } else {
+ newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(),
+ e, nodeId));
+ }
}
exceptions.clear();
exceptions.addAll(newExceptions);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a594f95..5fac823 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,10 +18,8 @@
*/
package org.apache.hyracks.control.nc.dataset;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -66,29 +64,22 @@
datasetMemoryManager = null;
}
partitionResultStateMap = new LinkedHashMap<>();
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
}
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
boolean asyncMode, int partition, int nPartitions) throws HyracksException {
- DatasetPartitionWriter dpw = null;
+ DatasetPartitionWriter dpw;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
datasetMemoryManager, fileFactory);
- ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
- if (rsIdMap == null) {
- rsIdMap = new ResultSetMap();
- partitionResultStateMap.put(jobId, rsIdMap);
- }
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.computeIfAbsent(jobId,
+ k -> new ResultSetMap());
- ResultState[] resultStates = rsIdMap.get(rsId);
- if (resultStates == null) {
- resultStates = new ResultState[nPartitions];
- rsIdMap.put(rsId, resultStates);
- }
+ ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
resultStates[partition] = dpw.getResultState();
}
@@ -141,7 +132,7 @@
throw new HyracksException("Unknown JobId " + jobId);
}
- ResultState[] resultStates = rsIdMap.get(resultSetId);
+ ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
if (resultStates == null) {
throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
}
@@ -161,49 +152,16 @@
@Override
public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
- if (rsIdMap != null) {
- ResultState[] resultStates = rsIdMap.get(resultSetId);
- if (resultStates != null) {
- ResultState state = resultStates[partition];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
- }
- resultStates[partition] = null;
- boolean stateEmpty = true;
- for (int i = 0; i < resultStates.length; i++) {
- if (resultStates[i] != null) {
- stateEmpty = false;
- break;
- }
- }
- if (stateEmpty) {
- rsIdMap.remove(resultSetId);
- }
- }
- if (rsIdMap.isEmpty()) {
- partitionResultStateMap.remove(jobId);
- }
+ if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) {
+ partitionResultStateMap.remove(jobId);
}
}
@Override
public synchronized void abortReader(JobId jobId) {
ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
-
- if (rsIdMap == null) {
- return;
- }
-
- for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) {
- ResultState[] resultStates = mapEntry.getValue();
- if (resultStates != null) {
- for (ResultState state : resultStates) {
- if (state != null) {
- state.abort();
- }
- }
- }
+ if (rsIdMap != null) {
+ rsIdMap.abortAll();
}
}
@@ -214,59 +172,33 @@
@Override
public synchronized void close() {
- for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
- deinit(entry.getKey());
+ for (JobId jobId : getJobIds()) {
+ deinit(jobId);
}
deallocatableRegistry.close();
}
@Override
- public Set<JobId> getJobIds() {
+ public synchronized Set<JobId> getJobIds() {
return partitionResultStateMap.keySet();
}
@Override
- public IDatasetStateRecord getState(JobId jobId) {
+ public synchronized IDatasetStateRecord getState(JobId jobId) {
return partitionResultStateMap.get(jobId);
}
@Override
- public void deinitState(JobId jobId) {
+ public synchronized void deinitState(JobId jobId) {
deinit(jobId);
partitionResultStateMap.remove(jobId);
}
- private void deinit(JobId jobId) {
+ private synchronized void deinit(JobId jobId) {
ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap != null) {
- for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) {
- ResultState[] resultStates = mapEntry.getValue();
- if (resultStates != null) {
- for (int i = 0; i < resultStates.length; i++) {
- ResultState state = resultStates[i];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
- }
- }
- }
- }
+ rsIdMap.closeAndDeleteAll();
}
}
- private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
- private static final long serialVersionUID = 1L;
-
- long timestamp;
-
- public ResultSetMap() {
- super();
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index e007050..f7aa2e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -86,10 +86,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (!partitionRegistered) {
- registerResultPartitionLocation(false);
- partitionRegistered = true;
- }
+ registerResultPartitionLocation(false);
if (datasetMemoryManager == null) {
resultState.write(buffer);
} else {
@@ -102,6 +99,7 @@
try {
resultState.closeAndDelete();
resultState.abort();
+ registerResultPartitionLocation(false);
manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
@@ -113,10 +111,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + partition + ")");
}
- if (!partitionRegistered) {
- registerResultPartitionLocation(true);
- partitionRegistered = true;
- }
+ registerResultPartitionLocation(true);
resultState.close();
try {
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
@@ -127,7 +122,11 @@
void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
try {
- manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, empty);
+ if (!partitionRegistered) {
+ manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult,
+ empty);
+ partitionRegistered = true;
+ }
} catch (HyracksException e) {
if (e instanceof HyracksDataException) {
throw (HyracksDataException) e;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
new file mode 100644
index 0000000..579f68b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.dataset;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+class ResultSetMap implements IDatasetStateRecord, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
+ private final long timestamp;
+ private final HashMap<ResultSetId, ResultState[]> resultStateMap;
+
+ ResultSetMap() {
+ timestamp = System.currentTimeMillis();
+ resultStateMap = new HashMap<>();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ ResultState[] getResultStates(ResultSetId rsId) {
+ return resultStateMap.get(rsId);
+ }
+
+ ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) {
+ return resultStateMap.computeIfAbsent(rsId, (k) -> new ResultState[nPartitions]);
+ }
+
+ /**
+ * removes a result partition for a result set
+ *
+ * @param jobId
+ * the id of the job that produced the result set
+ * @param resultSetId
+ * the id of the result set
+ * @param partition
+ * the partition number
+ * @return true, if all partitions for the resultSetId have been removed
+ */
+ boolean removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
+ final ResultState[] resultStates = resultStateMap.get(resultSetId);
+ if (resultStates != null) {
+ final ResultState state = resultStates[partition];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
+ }
+ resultStates[partition] = null;
+ boolean stateEmpty = true;
+ for (ResultState resState : resultStates) {
+ if (resState != null) {
+ stateEmpty = false;
+ break;
+ }
+ }
+ if (stateEmpty) {
+ resultStateMap.remove(resultSetId);
+ }
+ return resultStateMap.isEmpty();
+ }
+ return true;
+ }
+
+ void abortAll() {
+ applyToAllStates((rsId, state, i) -> state.abort());
+ }
+
+ void closeAndDeleteAll() {
+ applyToAllStates((rsId, state, i) -> {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for result set " + rsId);
+ });
+ }
+
+ @FunctionalInterface
+ private interface StateModifier {
+ void modify(ResultSetId rsId, ResultState entry, int partition);
+ }
+
+ private void applyToAllStates(StateModifier modifier) {
+ for (Map.Entry<ResultSetId, ResultState[]> entry : resultStateMap.entrySet()) {
+ final ResultSetId rsId = entry.getKey();
+ final ResultState[] resultStates = entry.getValue();
+ if (resultStates == null) {
+ continue;
+ }
+ for (int i = 0; i < resultStates.length; i++) {
+ final ResultState state = resultStates[i];
+ if (state != null) {
+ modifier.modify(rsId, state, i);
+ }
+ }
+ }
+ }
+}