Fix async result delivery

- add new result status "started" for async requests
- add support for result status to the test framework
- stabilize result distribution for error in async requests
- add support for "pollget" to test async execution
- add sleep function
- allow inject-failure function to return any first argument
- use URIs instead on Strings in the TestExecutor to ensure that URIs are
  correctly escaped
- add a few tests

Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1394
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index d14b8e2..6f24fc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -205,6 +205,7 @@
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardPrefixDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedCheckDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.SleepDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialAreaDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialCellDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
@@ -412,6 +413,9 @@
         temp.add(OrderedListConstructorDescriptor.FACTORY);
         temp.add(UnorderedListConstructorDescriptor.FACTORY);
 
+        // Sleep function
+        temp.add(SleepDescriptor.FACTORY);
+
         // Inject failure function
         temp.add(InjectFailureDescriptor.FACTORY);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index bdce0ca..83233f1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -111,7 +111,7 @@
         ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
         ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.resultTTL = 30000;
+        ccConfig.resultTTL = 120000;
         ccConfig.resultSweepThreshold = 1000;
         ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
         return ccConfig;
@@ -126,7 +126,7 @@
         ncConfig.resultIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
         ncConfig.messagingIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
         ncConfig.nodeId = ncName;
-        ncConfig.resultTTL = 30000;
+        ncConfig.resultTTL = 120000;
         ncConfig.resultSweepThreshold = 1000;
         ncConfig.appArgs = Collections.singletonList("-virtual-NC");
         ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 410fd1e..744b929 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -33,7 +34,6 @@
 
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -156,6 +156,7 @@
     }
 
     public enum ResultStatus {
+        STARTED("started"),
         SUCCESS("success"),
         TIMEOUT("timeout"),
         ERRORS("errors"),
@@ -458,13 +459,13 @@
         return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
     }
 
-    private static QueryTranslator.ResultDelivery parseResultDelivery(String mode) {
+    private static ResultDelivery parseResultDelivery(String mode) {
         if ("async".equals(mode)) {
-            return QueryTranslator.ResultDelivery.ASYNC;
+            return ResultDelivery.ASYNC;
         } else if ("deferred".equals(mode)) {
-            return QueryTranslator.ResultDelivery.DEFERRED;
+            return ResultDelivery.DEFERRED;
         } else {
-            return QueryTranslator.ResultDelivery.IMMEDIATE;
+            return ResultDelivery.IMMEDIATE;
         }
     }
 
@@ -474,7 +475,7 @@
         final StringWriter stringWriter = new StringWriter();
         final PrintWriter resultWriter = new PrintWriter(stringWriter);
 
-        QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode);
+        ResultDelivery delivery = parseResultDelivery(param.mode);
 
         SessionConfig sessionConfig = createSessionConfig(param, resultWriter);
         ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
@@ -517,7 +518,7 @@
             execStart = System.nanoTime();
             translator.compileAndExecute(hcc, hds, delivery, stats);
             execEnd = System.nanoTime();
-            printStatus(resultWriter, ResultStatus.SUCCESS);
+            printStatus(resultWriter, ResultDelivery.ASYNC == delivery ? ResultStatus.STARTED : ResultStatus.SUCCESS);
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             printError(resultWriter, pe);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index 039c740..3d31616 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -94,19 +94,7 @@
             resultReader.open(jobId, rsId);
 
             ObjectNode jsonResponse = om.createObjectNode();
-            String status;
-            switch (resultReader.getStatus()) {
-                case RUNNING:
-                    status = "RUNNING";
-                    break;
-                case SUCCESS:
-                    status = "SUCCESS";
-                    break;
-                default:
-                    status = "ERROR";
-                    break;
-            }
-            jsonResponse.put("status", status);
+            jsonResponse.put("status", resultReader.getStatus().name());
             out.write(jsonResponse.toString());
 
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 31ace22..f7e70a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.algebra.base.ILangExtension.Language;
@@ -50,12 +51,13 @@
     private final IStatementExecutorExtension statementExecutorExtension;
     private final ILangCompilationProvider aqlCompilationProvider;
     private final ILangCompilationProvider sqlppCompilationProvider;
-    private final DefaultStatementExecutorFactory defaultQueryTranslatorFactory;
+    private transient IStatementExecutorFactory statementExecutorFactory;
 
     /**
      * Initialize {@code CompilerExtensionManager} from configuration
      *
      * @param list
+     *            a list of extensions
      * @throws InstantiationException
      * @throws IllegalAccessException
      * @throws ClassNotFoundException
@@ -66,7 +68,6 @@
         Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
         Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
         IStatementExecutorExtension see = null;
-        defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
         if (list != null) {
             for (AsterixExtension extensionConf : list) {
                 IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance();
@@ -94,9 +95,19 @@
         this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second;
     }
 
+    /** @deprecated use getStatementExecutorFactory instead */
+    @Deprecated
     public IStatementExecutorFactory getQueryTranslatorFactory() {
-        return statementExecutorExtension == null ? defaultQueryTranslatorFactory
-                : statementExecutorExtension.getQueryTranslatorFactory();
+        return getStatementExecutorFactory(null);
+    }
+
+    public IStatementExecutorFactory getStatementExecutorFactory(ExecutorService executorService) {
+        if (statementExecutorFactory == null) {
+            statementExecutorFactory = statementExecutorExtension == null
+                    ? new DefaultStatementExecutorFactory(executorService)
+                    : statementExecutorExtension.getStatementExecutorFactory(executorService);
+        }
+        return statementExecutorFactory;
     }
 
     public ILangCompilationProvider getAqlCompilationProvider() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
index f7b6842..f77b25b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.cc;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.common.api.IExtension;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 
@@ -33,6 +35,15 @@
 
     /**
      * @return The extension implementation of the {@code IStatementExecutorFactory}
+     * @deprecated use getStatementExecutorFactory instead
      */
+    @Deprecated
     IStatementExecutorFactory getQueryTranslatorFactory();
+
+    /**
+     * @return The extension implementation of the {@code IStatementExecutorFactory}
+     */
+    default IStatementExecutorFactory getStatementExecutorFactory(ExecutorService executorService) {
+        return getQueryTranslatorFactory();
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index 99dcc83..15ed1b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.app.translator;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -26,12 +28,27 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 
 public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
 
+    protected final ExecutorService executorService;
+
+    /*
+     * @deprecated use other constructor
+     */
+    public DefaultStatementExecutorFactory() {
+        this(Executors.newSingleThreadExecutor(
+                new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName())));
+    }
+
+    public DefaultStatementExecutorFactory(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
     @Override
     public IStatementExecutor create(List<Statement> statements, SessionConfig conf,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
-        return new QueryTranslator(statements, conf, compilationProvider, storageComponentProvider);
+        return new QueryTranslator(statements, conf, compilationProvider, storageComponentProvider, executorService);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6fbf2a5..3c69d83 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,6 +34,7 @@
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -218,9 +219,10 @@
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
     protected final IStorageComponentProvider componentProvider;
+    protected final ExecutorService executorService;
 
-    public QueryTranslator(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider) {
+    public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider,
+            IStorageComponentProvider componentProvider, ExecutorService executorService) {
         this.statements = statements;
         this.sessionConfig = conf;
         this.componentProvider = componentProvider;
@@ -228,6 +230,7 @@
         apiFramework = new APIFramework(compliationProvider);
         rewriterFactory = compliationProvider.getRewriterFactory();
         activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+        this.executorService = executorService;
     }
 
     protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
@@ -1327,11 +1330,6 @@
         }
     }
 
-    protected Dataset getDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName)
-            throws MetadataException {
-        return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-    }
-
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1826,41 +1824,60 @@
         }
     }
 
-    public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
             IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
 
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         Query query = stmtInsertUpsert.getQuery();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
-                dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
-        try {
-            metadataProvider.setWriteTransaction(true);
-            JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
 
-            if (jobSpec != null && !compileOnly) {
-                if (stmtInsertUpsert.getReturnExpression() != null) {
-                    handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
-                } else {
-                    JobUtils.runJob(hcc, jobSpec, true);
+        final IMetadataLocker locker = new IMetadataLocker() {
+            @Override
+            public void lock() {
+                MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
+                        dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
+                        query.getDatasets());
+            }
+
+            @Override
+            public void unlock() {
+                MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
+                        dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
+                        query.getDatasets());
+            }
+        };
+        final IStatementCompiler compiler = () -> {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            boolean bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                metadataProvider.setWriteTransaction(true);
+                final JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                return compileOnly ? null : jobSpec;
+            } catch (Exception e) {
+                if (bActiveTxn) {
+                    abort(e, e, mdTxnCtx);
                 }
+                throw e;
             }
-            return jobSpec;
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
+        };
+
+        if (stmtInsertUpsert.getReturnExpression() != null) {
+            deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+        } else {
+            locker.lock();
+            try {
+                final JobSpecification jobSpec = compiler.compile();
+                if (jobSpec == null) {
+                    return;
+                }
+                JobUtils.runJob(hcc, jobSpec, true);
+            } finally {
+                locker.unlock();
             }
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
-                    dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
-                    query.getDatasets());
         }
     }
 
@@ -2515,69 +2532,124 @@
                 new StorageComponentProvider()));
     }
 
-    protected JobSpecification handleQuery(MetadataProvider metadataProvider, Query query,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
-            throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
-        try {
-            JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null);
+    private interface IMetadataLocker {
+        void lock();
 
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            if (query.isExplain()) {
-                sessionConfig.out().flush();
-                return jobSpec;
-            } else if (sessionConfig.isExecuteQuery() && jobSpec != null) {
-                handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
-            }
-            return jobSpec;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
-            // release external datasets' locks acquired during compilation of the query
-            ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
-        }
+        void unlock();
     }
 
-    private void handleQueryResult(MetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, JobSpecification jobSpec, ResultDelivery resultDelivery, Stats stats)
-            throws Exception {
-        JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+    private interface IResultPrinter {
+        void print(JobId jobId) throws HyracksDataException, AlgebricksException;
+    }
 
-        ResultHandle hand;
+    private interface IStatementCompiler {
+        JobSpecification compile() throws AlgebricksException, RemoteException, ACIDException;
+    }
+
+    protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
+        final IMetadataLocker locker = new IMetadataLocker() {
+            @Override
+            public void lock() {
+                MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
+            }
+
+            @Override
+            public void unlock() {
+                MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+                // release external datasets' locks acquired during compilation of the query
+                ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+            }
+        };
+        final IStatementCompiler compiler = () -> {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            boolean bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            try {
+                final JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                return query.isExplain() || !sessionConfig.isExecuteQuery() ? null : jobSpec;
+            } catch (Exception e) {
+                LOGGER.log(Level.INFO, e.getMessage(), e);
+                if (bActiveTxn) {
+                    abort(e, e, mdTxnCtx);
+                }
+                throw e;
+            }
+        };
+        deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
+    }
+
+    private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
+            MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats)
+            throws Exception {
+        final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
-                hand = new ResultHandle(jobId, metadataProvider.getResultSetId());
-                ResultUtil.printResultHandle(hand, sessionConfig);
-                hcc.waitForCompletion(jobId);
-                sessionConfig.out().flush();
+                MutableBoolean printed = new MutableBoolean(false);
+                executorService.submit(() -> {
+                    JobId jobId = null;
+                    try {
+                        jobId = createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+                            final ResultHandle handle = new ResultHandle(id, resultSetId);
+                            ResultUtil.printResultHandle(handle, sessionConfig);
+                            synchronized (printed) {
+                                printed.setTrue();
+                                printed.notify();
+                            }
+                        });
+                    } catch (Exception e) {
+                        GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
+                                resultDelivery.name() + " job " + "with id " + jobId + " failed", e);
+                    }
+                });
+                synchronized (printed) {
+                    while (!printed.booleanValue()) {
+                        printed.wait();
+                    }
+                }
                 break;
             case IMMEDIATE:
-                hcc.waitForCompletion(jobId);
-                ResultReader resultReader = new ResultReader(hdc);
-                resultReader.open(jobId, metadataProvider.getResultSetId());
-                ResultUtil.printResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+                createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+                    final ResultReader resultReader = new ResultReader(hdc);
+                    resultReader.open(id, resultSetId);
+                    ResultUtil.printResults(resultReader, sessionConfig, stats,
+                            metadataProvider.findOutputRecordType());
+                });
                 break;
             case DEFERRED:
-                hcc.waitForCompletion(jobId);
-                hand = new ResultHandle(jobId, metadataProvider.getResultSetId());
-                ResultUtil.printResultHandle(hand, sessionConfig);
-                sessionConfig.out().flush();
+                createAndRunJob(hcc, compiler, locker, resultDelivery, id -> {
+                    ResultUtil.printResultHandle(new ResultHandle(id, resultSetId), sessionConfig);
+                });
                 break;
             default:
                 break;
         }
     }
 
+    private static JobId createAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler,
+            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer) throws Exception {
+        locker.lock();
+        try {
+            final JobSpecification jobSpec = compiler.compile();
+            if (jobSpec == null) {
+                return JobId.INVALID;
+            }
+            final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+            if (ResultDelivery.ASYNC == resultDelivery) {
+                printer.print(jobId);
+                hcc.waitForCompletion(jobId);
+            } else {
+                hcc.waitForCompletion(jobId);
+                printer.print(jobId);
+            }
+            return jobId;
+        } finally {
+            locker.unlock();
+        }
+    }
+
     protected void handleCreateNodeGroupStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 92f3501..8ea4193 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -63,6 +63,7 @@
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -164,7 +165,7 @@
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" },
                 ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(),
-                ccExtensionManager.getQueryTranslatorFactory(), componentProvider));
+                getStatementExecutorFactory(), componentProvider));
         return webServer;
     }
 
@@ -228,35 +229,35 @@
         switch (key) {
             case AQL:
                 return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case AQL_QUERY:
                 return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case AQL_UPDATE:
                 return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case AQL_DDL:
                 return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case SQLPP:
                 return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case SQLPP_QUERY:
                 return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case SQLPP_UPDATE:
                 return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case SQLPP_DDL:
                 return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case QUERY_STATUS:
                 return new QueryStatusApiServlet(server.ctx(), paths);
             case QUERY_RESULT:
                 return new QueryResultApiServlet(server.ctx(), paths);
             case QUERY_SERVICE:
                 return new QueryServiceServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory(), componentProvider);
+                        getStatementExecutorFactory(), componentProvider);
             case CONNECTOR:
                 return new ConnectorApiServlet(server.ctx(), paths);
             case SHUTDOWN:
@@ -276,6 +277,11 @@
         }
     }
 
+    private IStatementExecutorFactory getStatementExecutorFactory() {
+        return ccExtensionManager.getStatementExecutorFactory(
+                ((ClusterControllerService) appCtx.getControllerService()).getExecutorService());
+    }
+
     @Override
     public void startupCompleted() throws Exception {
         ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
index 29f8eab..e7ad205 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.deferred.aql
@@ -22,6 +22,8 @@
  * Date             :  09/17/2013
  */
 
+//handlevariable=handle
+
 use dataverse test;
 
 for $i in dataset LineItem
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql
deleted file mode 100644
index 29f8eab..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.async.aql
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
-
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.5.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
similarity index 97%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
index 29f8eab..e7ad205 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.6.async.aql
@@ -22,6 +22,8 @@
  * Date             :  09/17/2013
  */
 
+//handlevariable=handle
+
 use dataverse test;
 
 for $i in dataset LineItem
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
index 29f8eab..5d59ca3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.7.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+//polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
similarity index 66%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.8.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
index 29f8eab..89ef35e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp
@@ -16,17 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#handlevariable=handle
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+set `import-private-functions` `true`;
+select value inject_failure(sleep("result", 5000), true);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.2.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.3.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
index 29f8eab..866b388 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#handlevariable=handle
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select value sleep("result", 3000);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.2.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.3.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.4.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
index 29f8eab..a44b911 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#handlevariable=handle
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.2.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.3.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
index 29f8eab..a44b911 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.1.deferred.sqlpp
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#handlevariable=handle
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+select i, i * i as i2 from range(1, 10) i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/deferred/deferred.2.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
index 7bae0d4..df7826f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.asyncdefer.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.4.deferred.sqlpp
@@ -22,8 +22,9 @@
  * Date             :  09/17/2013
  */
 
-use test;
+#handlevariable=handle
 
+use test;
 
 select element {'partkey':gen0.partkey,'pid':p,'shipdate':j.l_shipdate}
 from
@@ -34,5 +35,3 @@
     gen0.i as j at p
 where p < 4
 order by partkey, shipdate;
-
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
similarity index 98%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
index cb47163..df7826f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.5.async.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.6.async.sqlpp
@@ -22,8 +22,9 @@
  * Date             :  09/17/2013
  */
 
-use test;
+#handlevariable=handle
 
+use test;
 
 select element {'partkey':gen0.partkey,'pid':p,'shipdate':j.l_shipdate}
 from
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
index 29f8eab..916aadf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.7.pollget.http
@@ -16,17 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
+#polltimeoutsecs=10
 
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/status?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
index 29f8eab..a88991c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/flwor/at00/at00.4.asyncdefer.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/at00/at00.8.get.http
@@ -16,17 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description      :  Test for clause of the position variable in FLWOR expression
- * Expected Result  :  Success
- * Date             :  09/17/2013
- */
 
-use dataverse test;
-
-for $i in dataset LineItem
-group by $partkey := $i.l_partkey with $i
-for $j at $p in ( for $x in $i order by $x.l_shipdate return $x)
-where $p < 4
-order by $partkey
-return { "partkey": $partkey, "pid": $p, "shipdate": $j.l_shipdate }
+/query/result?handle=$handle
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json
new file mode 100644
index 0000000..dd665eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-failed/async-failed.2.json
@@ -0,0 +1 @@
+{"status":"FAILED"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json
new file mode 100644
index 0000000..6cffe65
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.2.json
@@ -0,0 +1 @@
+{"status":"RUNNING"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.3.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json
new file mode 100644
index 0000000..859d906
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-running/async-running.4.json
@@ -0,0 +1 @@
+"result"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.2.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async/async.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/deferred/deferred.2.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.4.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.5.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.6.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.2.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/flwor/at00/at00.6.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast
deleted file mode 100644
index 17dc8b5..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.1.ast
+++ /dev/null
@@ -1,22 +0,0 @@
-DataverseUse test
-TypeDecl LineItemType [
-  closed RecordType {
-    l_orderkey : bigint,
-    l_partkey : bigint,
-    l_suppkey : bigint,
-    l_linenumber : bigint,
-    l_quantity : double,
-    l_extendedprice : double,
-    l_discount : double,
-    l_tax : double,
-    l_returnflag : string,
-    l_linestatus : string,
-    l_shipdate : string,
-    l_commitdate : string,
-    l_receiptdate : string,
-    l_shipinstruct : string,
-    l_shipmode : string,
-    l_comment : string
-  }
-]
-DatasetDecl LineItem(LineItemType) partitioned by [[l_orderkey], [l_linenumber]]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast
deleted file mode 100644
index 916a59e..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.2.ast
+++ /dev/null
@@ -1 +0,0 @@
-DataverseUse test
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.3.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
-  (
-    LiteralExpr [STRING] [partkey]
-    :
-    FieldAccessor [
-      Variable [ Name=$gen0 ]
-      Field=partkey
-    ]
-  )
-  (
-    LiteralExpr [STRING] [pid]
-    :
-    Variable [ Name=$p ]
-  )
-  (
-    LiteralExpr [STRING] [shipdate]
-    :
-    FieldAccessor [
-      Variable [ Name=$j ]
-      Field=l_shipdate
-    ]
-  )
-]
-]
-FROM [  (
-    SELECT ELEMENT [
-    RecordConstructor [
-      (
-        LiteralExpr [STRING] [partkey]
-        :
-        FieldAccessor [
-          Variable [ Name=$i ]
-          Field=l_partkey
-        ]
-      )
-      (
-        LiteralExpr [STRING] [i]
-        :
-        (
-          SELECT ELEMENT [
-          FieldAccessor [
-            Variable [ Name=$x ]
-            Field=i
-          ]
-          ]
-          FROM [            Variable [ Name=$g ]
-            AS Variable [ Name=$x ]
-          ]
-          Orderby
-            FieldAccessor [
-              FieldAccessor [
-                Variable [ Name=$x ]
-                Field=i
-              ]
-              Field=l_shipdate
-            ]
-            ASC
-
-        )
-      )
-    ]
-    ]
-    FROM [      FunctionCall Metadata.dataset@1[
-        LiteralExpr [STRING] [LineItem]
-      ]
-      AS Variable [ Name=$i ]
-    ]
-    Groupby
-      Variable [ Name=$l_partkey ]
-      :=
-      FieldAccessor [
-        Variable [ Name=$i ]
-        Field=l_partkey
-      ]
-      GROUP AS Variable [ Name=$g ]
-      (
-        i:=Variable [ Name=$i ]
-      )
-
-  )
-  AS Variable [ Name=$gen0 ]
-,
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=i
-  ]
-  AS Variable [ Name=$j ]
- AT 
-Variable [ Name=$p ]
-]
-Where
-  OperatorExpr [
-    Variable [ Name=$p ]
-    <
-    LiteralExpr [LONG] [4]
-  ]
-Orderby
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=partkey
-  ]
-  ASC
-  FieldAccessor [
-    Variable [ Name=$j ]
-    Field=l_shipdate
-  ]
-  ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.4.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
-  (
-    LiteralExpr [STRING] [partkey]
-    :
-    FieldAccessor [
-      Variable [ Name=$gen0 ]
-      Field=partkey
-    ]
-  )
-  (
-    LiteralExpr [STRING] [pid]
-    :
-    Variable [ Name=$p ]
-  )
-  (
-    LiteralExpr [STRING] [shipdate]
-    :
-    FieldAccessor [
-      Variable [ Name=$j ]
-      Field=l_shipdate
-    ]
-  )
-]
-]
-FROM [  (
-    SELECT ELEMENT [
-    RecordConstructor [
-      (
-        LiteralExpr [STRING] [partkey]
-        :
-        FieldAccessor [
-          Variable [ Name=$i ]
-          Field=l_partkey
-        ]
-      )
-      (
-        LiteralExpr [STRING] [i]
-        :
-        (
-          SELECT ELEMENT [
-          FieldAccessor [
-            Variable [ Name=$x ]
-            Field=i
-          ]
-          ]
-          FROM [            Variable [ Name=$g ]
-            AS Variable [ Name=$x ]
-          ]
-          Orderby
-            FieldAccessor [
-              FieldAccessor [
-                Variable [ Name=$x ]
-                Field=i
-              ]
-              Field=l_shipdate
-            ]
-            ASC
-
-        )
-      )
-    ]
-    ]
-    FROM [      FunctionCall Metadata.dataset@1[
-        LiteralExpr [STRING] [LineItem]
-      ]
-      AS Variable [ Name=$i ]
-    ]
-    Groupby
-      Variable [ Name=$l_partkey ]
-      :=
-      FieldAccessor [
-        Variable [ Name=$i ]
-        Field=l_partkey
-      ]
-      GROUP AS Variable [ Name=$g ]
-      (
-        i:=Variable [ Name=$i ]
-      )
-
-  )
-  AS Variable [ Name=$gen0 ]
-,
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=i
-  ]
-  AS Variable [ Name=$j ]
- AT 
-Variable [ Name=$p ]
-]
-Where
-  OperatorExpr [
-    Variable [ Name=$p ]
-    <
-    LiteralExpr [LONG] [4]
-  ]
-Orderby
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=partkey
-  ]
-  ASC
-  FieldAccessor [
-    Variable [ Name=$j ]
-    Field=l_shipdate
-  ]
-  ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast
deleted file mode 100644
index cd20331..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/flwor/at00/at00.5.ast
+++ /dev/null
@@ -1,111 +0,0 @@
-DataverseUse test
-Query:
-SELECT ELEMENT [
-RecordConstructor [
-  (
-    LiteralExpr [STRING] [partkey]
-    :
-    FieldAccessor [
-      Variable [ Name=$gen0 ]
-      Field=partkey
-    ]
-  )
-  (
-    LiteralExpr [STRING] [pid]
-    :
-    Variable [ Name=$p ]
-  )
-  (
-    LiteralExpr [STRING] [shipdate]
-    :
-    FieldAccessor [
-      Variable [ Name=$j ]
-      Field=l_shipdate
-    ]
-  )
-]
-]
-FROM [  (
-    SELECT ELEMENT [
-    RecordConstructor [
-      (
-        LiteralExpr [STRING] [partkey]
-        :
-        FieldAccessor [
-          Variable [ Name=$i ]
-          Field=l_partkey
-        ]
-      )
-      (
-        LiteralExpr [STRING] [i]
-        :
-        (
-          SELECT ELEMENT [
-          FieldAccessor [
-            Variable [ Name=$x ]
-            Field=i
-          ]
-          ]
-          FROM [            Variable [ Name=$g ]
-            AS Variable [ Name=$x ]
-          ]
-          Orderby
-            FieldAccessor [
-              FieldAccessor [
-                Variable [ Name=$x ]
-                Field=i
-              ]
-              Field=l_shipdate
-            ]
-            ASC
-
-        )
-      )
-    ]
-    ]
-    FROM [      FunctionCall Metadata.dataset@1[
-        LiteralExpr [STRING] [LineItem]
-      ]
-      AS Variable [ Name=$i ]
-    ]
-    Groupby
-      Variable [ Name=$l_partkey ]
-      :=
-      FieldAccessor [
-        Variable [ Name=$i ]
-        Field=l_partkey
-      ]
-      GROUP AS Variable [ Name=$g ]
-      (
-        i:=Variable [ Name=$i ]
-      )
-
-  )
-  AS Variable [ Name=$gen0 ]
-,
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=i
-  ]
-  AS Variable [ Name=$j ]
- AT 
-Variable [ Name=$p ]
-]
-Where
-  OperatorExpr [
-    Variable [ Name=$p ]
-    <
-    LiteralExpr [LONG] [4]
-  ]
-Orderby
-  FieldAccessor [
-    Variable [ Name=$gen0 ]
-    Field=partkey
-  ]
-  ASC
-  FieldAccessor [
-    Variable [ Name=$j ]
-    Field=l_shipdate
-  ]
-  ASC
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a76ecbc..0d8da65 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -21,6 +21,29 @@
 
 ]>
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+  <test-group name="async-deferred">
+    <test-case FilePath="async-deferred">
+      <compilation-unit name="async-failed">
+        <output-dir compare="Text">async-failed</output-dir>
+        <expected-error>Error in processing tuple 0</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+      <compilation-unit name="deferred">
+        <output-dir compare="Text">deferred</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+      <compilation-unit name="async">
+        <output-dir compare="Text">async</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+      <compilation-unit name="async-running">
+        <output-dir compare="Text">async-running</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
   <test-group name="flwor">
     <test-case FilePath="flwor">
       <compilation-unit name="at00">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
index df7e5a0..fcd1a70 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_parser.xml
@@ -28,11 +28,6 @@
              QueryFileExtension=".sqlpp">
   <test-group name="flwor">
     <test-case FilePath="flwor">
-      <compilation-unit name="at00">
-        <output-dir compare="AST">at00</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="flwor">
       <compilation-unit name="at01">
         <output-dir compare="AST">at01</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 9fb6a31..41f9e67 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -32,23 +32,11 @@
 
     public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
-        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, waitForCompletion);
-        return jobIds[0];
-    }
-
-    public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, boolean waitForCompletion)
-            throws Exception {
-        JobId[] startedJobIds = new JobId[jobs.length];
-        for (int i = 0; i < jobs.length; i++) {
-            JobSpecification spec = jobs[i].getJobSpec();
-            spec.setMaxReattempts(0);
-            JobId jobId = hcc.startJob(spec);
-            startedJobIds[i] = jobId;
-            if (waitForCompletion) {
-                hcc.waitForCompletion(jobId);
-            }
+        spec.setMaxReattempts(0);
+        final JobId jobId = hcc.startJob(spec);
+        if (waitForCompletion) {
+            hcc.waitForCompletion(jobId);
         }
-        return startedJobIds;
+        return jobId;
     }
-
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index ff30df3..b5fbcf5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -32,12 +32,15 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.Inet4Address;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -72,6 +75,7 @@
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.http.util.EntityUtils;
 import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -91,12 +95,21 @@
     private static final long MAX_URL_LENGTH = 2000l;
     private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/",
             Pattern.MULTILINE | Pattern.DOTALL);
+    private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("//.*$", Pattern.MULTILINE);
+    private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("#.*$", Pattern.MULTILINE);
     private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$");
     private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
             Pattern.MULTILINE);
     private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
+    private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)");
+    private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)");
+
     public static final int TRUNCATE_THRESHOLD = 16384;
 
+    public static final String DELIVERY_ASYNC = "async";
+    public static final String DELIVERY_DEFERRED = "deferred";
+    public static final String DELIVERY_IMMEDIATE = "immediate";
+
     private static Method managixExecuteMethod = null;
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
 
@@ -376,8 +389,13 @@
 
     // For tests where you simply want the byte-for-byte output.
     private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception {
-        if (!actualFile.getParentFile().mkdirs()) {
-            LOGGER.warning("Unable to create actual file parent dir: " + actualFile.getParentFile());
+        final File parentDir = actualFile.getParentFile();
+        if (!parentDir.isDirectory()) {
+            if (parentDir.exists()) {
+                LOGGER.warning("Actual file parent \"" + parentDir + "\" exists but is not a directory");
+            } else if (!parentDir.mkdirs()) {
+                LOGGER.warning("Unable to create actual file parent dir: " + parentDir);
+            }
         }
         try (FileOutputStream out = new FileOutputStream(actualFile)) {
             IOUtils.copy(resultStream, out);
@@ -424,38 +442,30 @@
         return httpResponse;
     }
 
-    public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params)
+    public InputStream executeQuery(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params)
             throws Exception {
-        HttpUriRequest method = constructHttpMethod(str, url, "query", false, params);
+        HttpUriRequest method = constructHttpMethod(str, uri, "query", false, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         HttpResponse response = executeAndCheckHttpRequest(method);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeQueryService(String str, String url) throws Exception {
-        return executeQueryService(str, OutputFormat.CLEAN_JSON, url, new ArrayList<>(), false);
+    public InputStream executeQueryService(String str, URI uri) throws Exception {
+        return executeQueryService(str, OutputFormat.CLEAN_JSON, uri, new ArrayList<>(), false);
     }
 
-    public InputStream executeQueryService(String str, OutputFormat fmt, String url,
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
             List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
         setParam(params, "format", fmt.mimeType());
-        HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params)
-                : constructPostMethodUrl(str, url, "statement", params);
+        HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", params)
+                : constructPostMethodUrl(str, uri, "statement", params);
         // Set accepted output response type
         method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType());
         HttpResponse response = executeHttpRequest(method);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeQueryService(String statement, OutputFormat fmt, String url,
-            List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception {
-        setParam(params, "mode", deferred);
-        InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded);
-        String handle = ResultExtractor.extractHandle(resultStream);
-        return getHandleResult(handle, fmt);
-    }
-
     protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) {
         for (CompilationUnit.Parameter param : params) {
             if (name.equals(param.getName())) {
@@ -479,19 +489,19 @@
         return params;
     }
 
-    private HttpUriRequest constructHttpMethod(String statement, String endpoint, String stmtParam,
-            boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) {
-        if (statement.length() + endpoint.length() < MAX_URL_LENGTH) {
+    private HttpUriRequest constructHttpMethod(String statement, URI uri, String stmtParam,
+            boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) throws URISyntaxException {
+        if (statement.length() + uri.toString().length() < MAX_URL_LENGTH) {
             // Use GET for small-ish queries
-            return constructGetMethod(endpoint, injectStatement(statement, stmtParam, otherParams));
+            return constructGetMethod(uri, injectStatement(statement, stmtParam, otherParams));
         } else {
             // Use POST for bigger ones to avoid 413 FULL_HEAD
             String stmtParamName = (postStmtAsParam ? stmtParam : null);
-            return constructPostMethodUrl(statement, endpoint, stmtParamName, otherParams);
+            return constructPostMethodUrl(statement, uri, stmtParamName, otherParams);
         }
     }
 
-    private HttpUriRequest constructGetMethod(String endpoint, List<CompilationUnit.Parameter> params) {
+    private HttpUriRequest constructGetMethod(URI endpoint, List<CompilationUnit.Parameter> params) {
         RequestBuilder builder = RequestBuilder.get(endpoint);
         for (CompilationUnit.Parameter param : params) {
             builder.addParameter(param.getName(), param.getValue());
@@ -500,17 +510,16 @@
         return builder.build();
     }
 
-    private HttpUriRequest constructGetMethod(String endpoint, OutputFormat fmt,
+    private HttpUriRequest constructGetMethod(URI endpoint, OutputFormat fmt,
             List<CompilationUnit.Parameter> params) {
-
         HttpUriRequest method = constructGetMethod(endpoint, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         return method;
     }
 
-    private HttpUriRequest constructPostMethod(String endpoint, List<CompilationUnit.Parameter> params) {
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+    private HttpUriRequest constructPostMethod(URI uri, List<CompilationUnit.Parameter> params) {
+        RequestBuilder builder = RequestBuilder.post(uri);
         for (CompilationUnit.Parameter param : params) {
             builder.addParameter(param.getName(), param.getValue());
         }
@@ -518,18 +527,17 @@
         return builder.build();
     }
 
-    private HttpUriRequest constructPostMethod(String endpoint, OutputFormat fmt,
+    private HttpUriRequest constructPostMethod(URI uri, OutputFormat fmt,
             List<CompilationUnit.Parameter> params) {
-
-        HttpUriRequest method = constructPostMethod(endpoint, params);
+        HttpUriRequest method = constructPostMethod(uri, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         return method;
     }
 
-    protected HttpUriRequest constructPostMethodUrl(String statement, String endpoint, String stmtParam,
+    protected HttpUriRequest constructPostMethodUrl(String statement, URI uri, String stmtParam,
             List<CompilationUnit.Parameter> otherParams) {
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+        RequestBuilder builder = RequestBuilder.post(uri);
         if (stmtParam != null) {
             for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
                 builder.addParameter(param.getName(), param.getValue());
@@ -543,12 +551,12 @@
         return builder.build();
     }
 
-    protected HttpUriRequest constructPostMethodJson(String statement, String endpoint, String stmtParam,
+    protected HttpUriRequest constructPostMethodJson(String statement, URI uri, String stmtParam,
             List<CompilationUnit.Parameter> otherParams) {
         if (stmtParam == null) {
             throw new NullPointerException("Statement parameter required.");
         }
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+        RequestBuilder builder = RequestBuilder.post(uri);
         ObjectMapper om = new ObjectMapper();
         ObjectNode content = om.createObjectNode();
         for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
@@ -563,23 +571,23 @@
         return builder.build();
     }
 
-    public InputStream executeJSONGet(OutputFormat fmt, String url) throws Exception {
-        HttpUriRequest request = constructGetMethod(url, fmt, new ArrayList<>());
+    public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception {
+        HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>());
         HttpResponse response = executeAndCheckHttpRequest(request);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeJSONPost(OutputFormat fmt, String url) throws Exception {
-        HttpUriRequest request = constructPostMethod(url, fmt, new ArrayList<>());
+    public InputStream executeJSONPost(OutputFormat fmt, URI uri) throws Exception {
+        HttpUriRequest request = constructPostMethod(uri, fmt, new ArrayList<>());
         HttpResponse response = executeAndCheckHttpRequest(request);
         return response.getEntity().getContent();
     }
 
     // To execute Update statements
     // Insert and Delete statements are executed here
-    public void executeUpdate(String str, String url) throws Exception {
+    public void executeUpdate(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
                 .build();
 
         // Execute the method.
@@ -587,30 +595,25 @@
     }
 
     // Executes AQL in either async or async-defer mode.
-    public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception {
+    public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri,
+            Map<String, Object> variableCtx) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url)
+        HttpUriRequest request = RequestBuilder.post(uri)
                 .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
-                .setEntity(new StringEntity(str, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()).build();
+                .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
+                .build();
+
+        String handleVar = getHandleVariable(statement);
 
         HttpResponse response = executeAndCheckHttpRequest(request);
         InputStream resultStream = response.getEntity().getContent();
+        String handle = IOUtils.toString(resultStream, "UTF-8");
 
-        String theHandle = IOUtils.toString(resultStream, "UTF-8");
-
-        // take the handle and parse it so results can be retrieved
-        return getHandleResult(theHandle, fmt);
-    }
-
-    private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = getEndpoint(Lets.QUERY_RESULT);
-
-        // Create a method instance.
-        HttpUriRequest request = RequestBuilder.get(url).addParameter("handle", handle)
-                .setHeader("Accept", fmt.mimeType()).build();
-
-        HttpResponse response = executeAndCheckHttpRequest(request);
-        return response.getEntity().getContent();
+        if (handleVar != null) {
+            variableCtx.put(handleVar, handle);
+            return resultStream;
+        }
+        return null;
     }
 
     // To execute DDL and Update statements
@@ -619,9 +622,9 @@
     // create index statement
     // create dataverse statement
     // create function statement
-    public void executeDDL(String str, String url) throws Exception {
+    public void executeDDL(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
                 .build();
 
         // Execute the method.
@@ -735,9 +738,10 @@
         executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
     }
 
-    public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
-            boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
-            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
+    public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+            String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
+            MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
+            throws Exception {
         File qbcFile;
         boolean failed = false;
         File expectedResultFile;
@@ -762,17 +766,11 @@
                     ResultExtractor.extract(resultStream);
                 }
                 break;
+            case "pollget":
             case "pollquery":
                 // polltimeoutsecs=nnn, polldelaysecs=nnn
-                final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
-                int timeoutSecs;
-                if (timeoutMatcher.find()) {
-                    timeoutSecs = Integer.parseInt(timeoutMatcher.group(1));
-                } else {
-                    throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
-                }
-                final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
-                int retryDelaySecs = retryDelayMatcher.find() ? Integer.parseInt(timeoutMatcher.group(1)) : 1;
+                int timeoutSecs = getTimeoutSecs(statement);
+                int retryDelaySecs = getRetryDelaySecs(statement);
                 long startTime = System.currentTimeMillis();
                 long limitTime = startTime + TimeUnit.SECONDS.toMillis(timeoutSecs);
                 ctx.setType(ctx.getType().substring("poll".length()));
@@ -780,7 +778,7 @@
                 LOGGER.fine("polling for up to " + timeoutSecs + " seconds w/ " + retryDelaySecs + " second(s) delay");
                 while (true) {
                     try {
-                        executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                        executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                                 expectedResultFileCtxs, testFile, actualPath);
                         finalException = null;
                         break;
@@ -800,7 +798,7 @@
                 break;
             case "query":
             case "async":
-            case "asyncdefer":
+            case "deferred":
                 // isDmlRecoveryTest: insert Crash and Recovery
                 if (isDmlRecoveryTest) {
                     executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -810,31 +808,43 @@
                 }
                 InputStream resultStream = null;
                 OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
+                final String reqType = ctx.getType();
+                final List<CompilationUnit.Parameter> params = cUnit.getParameter();
                 if (ctx.getFile().getName().endsWith("aql")) {
-                    if (ctx.getType().equalsIgnoreCase("query")) {
-                        resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY),
-                                cUnit.getParameter());
-                    } else if (ctx.getType().equalsIgnoreCase("async")) {
-                        resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Lets.AQL));
-                    } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
-                        resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Lets.AQL));
+                    if (reqType.equalsIgnoreCase("query")) {
+                        resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY), params);
+                    } else {
+                        final URI endpoint = getEndpoint(Lets.AQL);
+                        if (reqType.equalsIgnoreCase("async")) {
+                            resultStream = executeAnyAQLAsync(statement, false, fmt, endpoint, variableCtx);
+                        } else if (reqType.equalsIgnoreCase("deferred")) {
+                            resultStream = executeAnyAQLAsync(statement, true, fmt, endpoint, variableCtx);
+                        }
+                        Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), resultStream);
                     }
                 } else {
-                    final String reqType = ctx.getType();
-                    final String url = getEndpoint(Lets.QUERY_SERVICE);
-                    final List<CompilationUnit.Parameter> params = cUnit.getParameter();
-                    if (reqType.equalsIgnoreCase("query")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true);
+                    String delivery = DELIVERY_IMMEDIATE;
+                    if (reqType.equalsIgnoreCase("async")) {
+                        delivery = DELIVERY_ASYNC;
+                    } else if (reqType.equalsIgnoreCase("deferred")) {
+                        delivery = DELIVERY_DEFERRED;
+                    }
+                    final URI uri = getEndpoint(Lets.QUERY_SERVICE);
+                    if (DELIVERY_IMMEDIATE.equals(delivery)) {
+                        resultStream = executeQueryService(statement, fmt, uri, params, true);
                         resultStream = ResultExtractor.extract(resultStream);
-                    } else if (reqType.equalsIgnoreCase("async")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true, "async");
-                    } else if (reqType.equalsIgnoreCase("asyncdefer")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true, "deferred");
+                    } else {
+                        String handleVar = getHandleVariable(statement);
+                        setParam(params, "mode", delivery);
+                        resultStream = executeQueryService(statement, fmt, uri, params, true);
+                        String handle = ResultExtractor.extractHandle(resultStream);
+                        Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), handleVar);
+                        variableCtx.put(handleVar, handle);
                     }
                 }
                 if (queryCount.intValue() >= expectedResultFileCtxs.size()) {
-                    throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
-                            + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
+                    Assert.fail("no result file for " + testFile.toString() + "; queryCount: " + queryCount
+                            + ", filectxs.size: " + expectedResultFileCtxs.size());
                 }
                 expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
 
@@ -938,17 +948,9 @@
                             "Unexpected format for method " + ctx.getType() + ": " + ctx.extension());
                 }
                 fmt = OutputFormat.forCompilationUnit(cUnit);
-                String endpoint = stripJavaComments(statement).trim();
-                switch (ctx.getType()) {
-                    case "get":
-                        resultStream = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
-                        break;
-                    case "post":
-                        resultStream = executeJSONPost(fmt, "http://" + host + ":" + port + endpoint);
-                        break;
-                    default:
-                        throw new IllegalStateException("NYI: " + ctx.getType());
-                }
+                final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim();
+                final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx);
+                resultStream = executeHttp(ctx.getType(), variablesReplaced, fmt);
                 expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
                 actualResultFile = testCaseCtx.getActualResultFile(cUnit, expectedResultFile, new File(actualPath));
                 writeOutputToFile(actualResultFile, resultStream);
@@ -1047,11 +1049,56 @@
         }
     }
 
+    protected int getTimeoutSecs(String statement) {
+        final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
+        if (timeoutMatcher.find()) {
+            return Integer.parseInt(timeoutMatcher.group(1));
+        } else {
+            throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
+        }
+    }
+
+    protected static int getRetryDelaySecs(String statement) {
+        final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
+        return retryDelayMatcher.find() ? Integer.parseInt(retryDelayMatcher.group(1)) : 1;
+    }
+
+    protected static String getHandleVariable(String statement) {
+        final Matcher handleVariableMatcher = HANDLE_VARIABLE_PATTERN.matcher(statement);
+        return handleVariableMatcher.find() ? handleVariableMatcher.group(1) : null;
+    }
+
+    protected static String replaceVarRef(String statement, Map<String, Object> variableCtx) {
+        String tmpStmt = statement;
+        Matcher variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+        while (variableReferenceMatcher.find()) {
+            String var = variableReferenceMatcher.group(1);
+            Object value = variableCtx.get(var);
+            Assert.assertNotNull("No value for variable reference $" + var, value);
+            tmpStmt = tmpStmt.replace("$" + var, String.valueOf(value));
+            variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+        }
+        return tmpStmt;
+    }
+
+    protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception {
+        String[] split = endpoint.split("\\?");
+        URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null);
+        switch (ctxType) {
+            case "get":
+                return executeJSONGet(fmt, uri);
+            case "post":
+                return executeJSONPost(fmt, uri);
+            default:
+                throw new AssertionError("Not implemented: " + ctxType);
+        }
+    }
+
     private void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
         //get node process id
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
+        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1065,10 +1112,6 @@
 
     public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
             boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
-        File testFile;
-        String statement;
-        List<TestFileContext> expectedResultFileCtxs;
-        List<TestFileContext> testFileCtxs;
         MutableInt queryCount = new MutableInt(0);
         int numOfErrors = 0;
         int numOfFiles = 0;
@@ -1076,14 +1119,15 @@
         for (CompilationUnit cUnit : cUnits) {
             LOGGER.info(
                     "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
-            testFileCtxs = testCaseCtx.getTestFiles(cUnit);
-            expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
+            Map<String, Object> variableCtx = new HashMap<>();
+            List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit);
+            List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
             for (TestFileContext ctx : testFileCtxs) {
                 numOfFiles++;
-                testFile = ctx.getFile();
-                statement = readTestFile(testFile);
+                final File testFile = ctx.getFile();
+                final String statement = readTestFile(testFile);
                 try {
-                    executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                    executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                             expectedResultFileCtxs, testFile, actualPath);
                 } catch (Exception e) {
                     System.err.println("testFile " + testFile.toString() + " raised an exception: " + e);
@@ -1140,14 +1184,19 @@
         return servlet.getPath();
     }
 
-    protected String getEndpoint(Lets servlet) {
-        return "http://" + host + ":" + port + getPath(servlet).replaceAll("/\\*$", "");
+    protected URI getEndpoint(Lets servlet) throws URISyntaxException {
+        return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null);
     }
 
     public static String stripJavaComments(String text) {
         return JAVA_BLOCK_COMMENT_PATTERN.matcher(text).replaceAll("");
     }
 
+    public static String stripLineComments(String text) {
+        final String s = SHELL_LINE_COMMENT_PATTERN.matcher(text).replaceAll("");
+        return JAVA_LINE_COMMENT_PATTERN.matcher(s).replaceAll("");
+    }
+
     public void cleanup(String testCase, List<String> badtestcases) throws Exception {
         try {
             ArrayList<String> toBeDropped = new ArrayList<>();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
index 50d103a..2eada38 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
@@ -29,7 +29,7 @@
 @RunWith(Parameterized.class)
 public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
 
-    @Parameters
+    @Parameters(name = "ManagixSqlppExecutionIT {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
         Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml");
         if (testArgs.size() == 0) {
@@ -48,10 +48,7 @@
 
     }
 
-    private TestCaseContext tcCtx;
-
     public ManagixSqlppExecutionIT(TestCaseContext tcCtx) {
         super(tcCtx);
-        this.tcCtx = tcCtx;
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 6eaf4ae..0769596 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -92,6 +92,7 @@
 import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
+import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
@@ -651,6 +652,8 @@
             "spatial-cell", 4);
     public static final FunctionIdentifier SWITCH_CASE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "switch-case", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier SLEEP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sleep", 2);
     public static final FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "inject-failure", 2);
     public static final FunctionIdentifier FLOW_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1058,6 +1061,7 @@
         addPrivateFunction(SUBSET_COLLECTION, SubsetCollectionTypeComputer.INSTANCE, true);
         addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true);
         addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true);
+        addFunction(SLEEP, SleepTypeComputer.INSTANCE, false);
         addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE, true);
         addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true);
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
index df050be..cc19ac4 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
@@ -21,29 +21,31 @@
 import org.apache.asterix.om.exceptions.TypeMismatchException;
 import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 
+/**
+ * The first argument of INJECT_FAILURE can be any data model instance and will be passed verbatim to the
+ * caller. The second argument is a boolean that determines if the invocation throws an exception.
+ *
+ * Consequently {@link #checkArgType(String, int, IAType)} validates that the second argument is a
+ * boolean and {@link #getResultType(ILogicalExpression, IAType...)} returns the type of the first
+ * argument.
+ */
 public class InjectFailureTypeComputer extends AbstractResultTypeComputer {
 
     public static final InjectFailureTypeComputer INSTANCE = new InjectFailureTypeComputer();
 
-    protected InjectFailureTypeComputer() {
-    }
-
     @Override
     protected void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
-        ATypeTag actualTypeTag = type.getTypeTag();
-        if (actualTypeTag != ATypeTag.BOOLEAN) {
-            throw new TypeMismatchException(funcName, argIndex, actualTypeTag, ATypeTag.BOOLEAN);
+        if (argIndex == 1 && type.getTypeTag() != ATypeTag.BOOLEAN) {
+            throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.BOOLEAN);
         }
     }
 
     @Override
     protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
-        return BuiltinType.ABOOLEAN;
+        return strippedInputTypes[0];
     }
-
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
new file mode 100644
index 0000000..6b885e3
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class SleepTypeComputer extends AbstractResultTypeComputer {
+    public static final SleepTypeComputer INSTANCE = new SleepTypeComputer();
+
+    @Override
+    public void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
+        if (argIndex == 1) {
+            switch (type.getTypeTag()) {
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                    break;
+                default:
+                    throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.INT8,
+                            ATypeTag.INT16, ATypeTag.INT32, ATypeTag.INT64);
+            }
+        }
+    }
+
+    @Override
+    public IAType getResultType(ILogicalExpression expr, IAType... types) throws AlgebricksException {
+        return types[0];
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
index 164f369..af5f690 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.runtime.evaluators.functions;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
@@ -39,6 +42,9 @@
 public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
@@ -75,6 +81,7 @@
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(argPtr.getByteArray(),
                                     argPtr.getStartOffset() + 1);
                             if (argResult) {
+                                LOGGER.log(Level.SEVERE, ctx.getTaskAttemptId() + " injecting failure");
                                 throw new RuntimeDataException(ErrorCode.INJECTED_FAILURE, getIdentifier());
                             }
                         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
new file mode 100644
index 0000000..a186b32
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
+    public static final IFunctionDescriptorFactory FACTORY = SleepDescriptor::new;
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
+                return new IScalarEvaluator() {
+
+                    private IPointable argTime = new VoidPointable();
+                    private final IScalarEvaluator evalValue = args[0].createScalarEvaluator(ctx);
+                    private final IScalarEvaluator evalTime = args[1].createScalarEvaluator(ctx);
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+                        evalValue.evaluate(tuple, result);
+                        evalTime.evaluate(tuple, argTime);
+
+                        final byte[] bytes = argTime.getByteArray();
+                        final int offset = argTime.getStartOffset();
+                        final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
+
+                        try {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " sleeping for " + time + " ms");
+                            }
+                            Thread.sleep(time);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } finally {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " done sleeping for " + time + " ms");
+                            }
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SLEEP;
+    }
+
+}
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index c926ec0..175ecc4 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
 
@@ -89,7 +90,8 @@
     public void test1_sanityQuery() throws Exception {
         TestExecutor testExecutor = new TestExecutor();
         InputStream resultStream = testExecutor.executeQuery("1+1", OutputFormat.ADM,
-                "http://127.0.0.1:19002" + Lets.AQL_QUERY.getPath(), Collections.emptyList());
+                new URI("http", null, "127.0.0.1", 19002, Lets.AQL_QUERY.getPath(), null, null),
+                Collections.emptyList());
         StringWriter sw = new StringWriter();
         IOUtils.copy(resultStream, sw);
         Assert.assertEquals("2", sw.toString().trim());
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
index e41a624..7a53a9e 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
@@ -59,10 +59,16 @@
 
     @Override
     public int compareTo(TestFileContext o) {
-        if (this.seqNum > o.seqNum)
+        if (this.seqNum > o.seqNum) {
             return 1;
-        else if (this.seqNum < o.seqNum)
+        } else if (this.seqNum < o.seqNum) {
             return -1;
+        }
         return 0;
     }
+
+    @Override
+    public String toString() {
+        return String.valueOf(seqNum) + ":" + type + ":" + file;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index a50e1ee..3165840 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -71,17 +71,24 @@
     }
 
     public void start() {
-        status = Status.RUNNING;
+        updateStatus(Status.RUNNING);
     }
 
     public void writeEOS() {
-        status = Status.SUCCESS;
+        updateStatus(Status.SUCCESS);
     }
 
     public void fail() {
         status = Status.FAILED;
     }
 
+    private void updateStatus(final DatasetDirectoryRecord.Status newStatus) {
+        // FAILED is a stable status
+        if (status != Status.FAILED) {
+            status = newStatus;
+        }
+    }
+
     public Status getStatus() {
         return status;
     }
@@ -99,6 +106,6 @@
 
     @Override
     public String toString() {
-        return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
+        return String.valueOf(address) + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 34ed65c..f29ff4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -20,9 +20,14 @@
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DatasetJobRecord implements IDatasetStateRecord {
     public enum Status {
+        IDLE,
         RUNNING,
         SUCCESS,
         FAILED
@@ -36,20 +41,30 @@
 
     private List<Exception> exceptions;
 
+    private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
+
     public DatasetJobRecord() {
         this.timestamp = System.currentTimeMillis();
-        this.status = Status.RUNNING;
+        this.status = Status.IDLE;
+    }
+
+    private void updateStatus(Status newStatus) {
+        // FAILED is a stable status
+        if (status != Status.FAILED) {
+            status = newStatus;
+        }
     }
 
     public void start() {
-        status = Status.RUNNING;
+        updateStatus(Status.RUNNING);
     }
 
     public void success() {
-        status = Status.SUCCESS;
+        updateStatus(Status.SUCCESS);
     }
 
-    public void fail() {
+    public void fail(ResultSetId rsId, int partition) {
+        getOrCreateDirectoryRecord(rsId, partition).fail();
         status = Status.FAILED;
     }
 
@@ -58,6 +73,7 @@
         this.exceptions = exceptions;
     }
 
+    @Override
     public long getTimestamp() {
         return timestamp;
     }
@@ -66,7 +82,57 @@
         return status;
     }
 
+    @Override
+    public String toString() {
+        return resultSetMetadataMap.toString();
+    }
+
     public List<Exception> getExceptions() {
         return exceptions;
     }
+
+    public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions) throws
+            HyracksDataException {
+        ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
+        if (rsMd == null) {
+            resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult));
+        } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) {
+            throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
+        }
+        //TODO(tillw) throwing a HyracksDataException here hangs the execution tests
+    }
+
+    public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
+        return resultSetMetadataMap.get(rsId);
+    }
+
+    public synchronized DatasetDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            records[partition] = new DatasetDirectoryRecord();
+        }
+        return records[partition];
+    }
+
+    public synchronized DatasetDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) throws
+            HyracksDataException {
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            throw new HyracksDataException("no record for partition " + partition + " of result set " + rsId);
+        }
+        return records[partition];
+    }
+
+    public synchronized void updateStatus(ResultSetId rsId) {
+        int successCount = 0;
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        for (DatasetDirectoryRecord record : records) {
+            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
+                successCount++;
+            }
+        }
+        if (successCount == records.length) {
+            success();
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
index 2285981..8e9e3dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
@@ -18,14 +18,15 @@
  */
 package org.apache.hyracks.api.dataset;
 
+import java.util.Arrays;
+
 public class ResultSetMetaData {
+    private final DatasetDirectoryRecord[] records;
     private final boolean ordered;
 
-    private final DatasetDirectoryRecord[] records;
-
-    public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+    ResultSetMetaData(int len, boolean ordered) {
+        this.records = new DatasetDirectoryRecord[len];
         this.ordered = ordered;
-        this.records = records;
     }
 
     public boolean getOrderedResult() {
@@ -35,4 +36,11 @@
     public DatasetDirectoryRecord[] getRecords() {
         return records;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records));
+        return sb.toString();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index d094368..35002f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -49,6 +49,10 @@
     public static final int ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT = 12;
     public static final int DUPLICATE_IODEVICE = 13;
     public static final int NESTED_IODEVICES = 14;
+    public static final int MORE_THAN_ONE_RESULT = 15;
+    public static final int RESULT_FAILURE_EXCEPTION = 16;
+    public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
+    public static final int INCONSISTENT_RESULT_METADATA = 18;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 0fd6923..404104d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -36,6 +36,11 @@
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
     }
 
+    public static HyracksDataException create(HyracksDataException e, String nodeId) {
+        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e
+                .getParams());
+    }
+
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
         super(component, errorCode, message, cause, nodeId, params);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index b1fa494..7969700 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -26,6 +26,9 @@
 import org.apache.hyracks.api.io.IWritable;
 
 public final class JobId implements IWritable, Serializable {
+
+    public static final JobId INVALID = new JobId(-1l);
+
     private static final long serialVersionUID = 1L;
     private long id;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 72f7c65..de58f33 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -33,5 +33,9 @@
 12 = Invalid attempt to write to a flushed append only metadata page
 13 = Duplicate IODevices are not allowed
 14 = IODevices should not be nested within each other
+15 = More than 1 result for job %1$s
+16 = Failure producing result set %1$s for job %2$s
+17 = No exception for failed result set %1$s for job %2$s
+18 = Inconsistent metadata for result set %1$s"
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ae0f361..37c4177 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -325,10 +325,14 @@
         return workQueue;
     }
 
-    public Executor getExecutor() {
+    public ExecutorService getExecutorService() {
         return executor;
     }
 
+    public Executor getExecutor() {
+        return getExecutorService();
+    }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 4d7d1c3..46a173e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -25,6 +25,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -33,6 +35,7 @@
 import org.apache.hyracks.api.dataset.IDatasetStateRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.dataset.ResultSetMetaData;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -48,6 +51,9 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
+
+    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
     private final long resultTTL;
 
     private final long resultSweepThreshold;
@@ -62,22 +68,24 @@
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
     }
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr == null) {
-            djr = new DatasetJobRecord();
-            jobResultLocations.put(jobId, new JobResultInfo(djr, null));
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId);
         }
+        if (jobResultLocations.get(jobId) != null) {
+            throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
+        }
+        jobResultLocations.put(jobId, new JobResultInfo(new DatasetJobRecord(), null));
     }
 
     @Override
     public void notifyJobStart(JobId jobId) throws HyracksException {
-        // Auto-generated method stub
+        jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
@@ -87,35 +95,36 @@
 
     private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
         final JobResultInfo jri = jobResultLocations.get(jobId);
-        return jri == null ? null : jri.record;
+        return jri == null ? null : jri.getRecord();
+    }
+
+    private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) {
+        final DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        if (djr == null) {
+            throw new NullPointerException();
+        }
+        return djr;
     }
 
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws
+            HyracksDataException {
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
+        DatasetDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
 
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        if (resultSetMetaData == null) {
-            resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
-            djr.put(rsId, resultSetMetaData);
-        }
+        record.setNetworkAddress(networkAddress);
+        record.setEmpty(emptyResult);
+        record.start();
 
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        if (records[partition] == null) {
-            records[partition] = new DatasetDirectoryRecord();
-        }
-        records[partition].setNetworkAddress(networkAddress);
-        records[partition].setEmpty(emptyResult);
-        records[partition].start();
-
-        Waiters waiters = jobResultLocations.get(jobId).waiters;
-        Waiter waiter = waiters != null ? waiters.get(rsId) : null;
+        final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+        Waiter waiter = jobResultInfo.getWaiter(rsId);
         if (waiter != null) {
             try {
                 DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
                 if (updatedRecords != null) {
-                    waiters.remove(rsId);
+                    jobResultInfo.removeWaiter(rsId);
                     waiter.callback.setValue(updatedRecords);
                 }
             } catch (Exception e) {
@@ -126,51 +135,28 @@
     }
 
     @Override
-    public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
-        int successCount = 0;
-
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        records[partition].writeEOS();
-
-        for (DatasetDirectoryRecord record : records) {
-            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
-                successCount++;
-            }
-        }
-        if (successCount == records.length) {
-            djr.success();
-        }
+    public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+            throws HyracksDataException {
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.getDirectoryRecord(rsId, partition).writeEOS();
+        djr.updateStatus(rsId);
         notifyAll();
     }
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail();
-        }
-        final Waiters waiters = jobResultLocations.get(jobId).waiters;
-        if (waiters != null) {
-            waiters.get(rsId).callback.setException(new Exception());
-            waiters.remove(rsId);
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.fail(rsId, partition);
+        jobResultLocations.get(jobId).setException(new Exception());
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail(exceptions);
-        }
-        final Waiters waiters = jobResultLocations.get(jobId).waiters;
-        if (waiters != null) {
-            for (ResultSetId rsId : waiters.keySet()) {
-                waiters.remove(rsId).callback.setException(exceptions.get(0));
-            }
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.fail(exceptions);
+        // TODO(tillw) throwing an NPE here hangs the system, why?
+        jobResultLocations.get(jobId).setException(exceptions.isEmpty() ? null : exceptions.get(0));
         notifyAll();
     }
 
@@ -184,7 +170,6 @@
                 throw new HyracksDataException(e);
             }
         }
-
         return djr.getStatus();
     }
 
@@ -195,7 +180,7 @@
 
     @Override
     public IDatasetStateRecord getState(JobId jobId) {
-        return jobResultLocations.get(jobId).record;
+        return getDatasetJobRecord(jobId);
     }
 
     @Override
@@ -210,20 +195,7 @@
             throws HyracksDataException {
         DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
         if (updatedRecords == null) {
-            JobResultInfo jri = jobResultLocations.get(jobId);
-            Waiters waiters;
-            if (jri == null) {
-                waiters = new Waiters();
-                jri = new JobResultInfo(null, waiters);
-                jobResultLocations.put(jobId, jri);
-            } else {
-                waiters = jri.waiters;
-                if (waiters == null) {
-                    waiters = new Waiters();
-                    jri.waiters = waiters;
-                }
-            }
-            waiters.put(rsId, new Waiter(knownRecords, callback));
+            jobResultLocations.get(jobId).addWaiter(rsId, knownRecords, callback);
         } else {
             callback.setValue(updatedRecords);
         }
@@ -248,26 +220,25 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-
-        if (djr == null) {
-            throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
 
         if (djr.getStatus() == Status.FAILED) {
             List<Exception> caughtExceptions = djr.getExceptions();
-            if (caughtExceptions == null) {
-                throw new HyracksDataException("Job failed.");
+            if (caughtExceptions != null && !caughtExceptions.isEmpty()) {
+                final Exception cause = caughtExceptions.get(caughtExceptions.size() - 1);
+                if (cause instanceof HyracksDataException) {
+                    throw (HyracksDataException) cause;
+                }
+                throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_EXCEPTION, cause, rsId, jobId);
             } else {
-                throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+                throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId);
             }
         }
 
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+        final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId);
+        if (resultSetMetaData == null) {
             return null;
         }
-
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
 
         return Arrays.equals(records, knownRecords) ? null : records;
@@ -275,13 +246,42 @@
 }
 
 class JobResultInfo {
+
+    private DatasetJobRecord record;
+    private Waiters waiters;
+
     JobResultInfo(DatasetJobRecord record, Waiters waiters) {
         this.record = record;
         this.waiters = waiters;
     }
 
-    DatasetJobRecord record;
-    Waiters waiters;
+    DatasetJobRecord getRecord() {
+        return record;
+    }
+
+    void addWaiter(ResultSetId rsId, DatasetDirectoryRecord[] knownRecords,
+            IResultCallback<DatasetDirectoryRecord[]> callback) {
+        if (waiters == null) {
+            waiters = new Waiters();
+        }
+        waiters.put(rsId, new Waiter(knownRecords, callback));
+    }
+
+    Waiter removeWaiter(ResultSetId rsId) {
+        return waiters.remove(rsId);
+    }
+
+    Waiter getWaiter(ResultSetId rsId) {
+        return waiters != null ? waiters.get(rsId) : null;
+    }
+
+    void setException(Exception exception) {
+        if (waiters != null) {
+            for (ResultSetId rsId : waiters.keySet()) {
+                waiters.remove(rsId).callback.setException(exception);
+            }
+        }
+    }
 }
 
 class Waiters extends HashMap<ResultSetId, Waiter> {
@@ -289,11 +289,11 @@
 }
 
 class Waiter {
+    DatasetDirectoryRecord[] knownRecords;
+    IResultCallback<DatasetDirectoryRecord[]> callback;
+
     Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
         this.knownRecords = knownRecords;
         this.callback = callback;
     }
-
-    DatasetDirectoryRecord[] knownRecords;
-    IResultCallback<DatasetDirectoryRecord[]> callback;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 9e4e03e..663a53a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -35,9 +35,11 @@
     public void init(ExecutorService executor);
 
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress);
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
+            throws HyracksDataException;
 
-    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
+    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+            throws HyracksDataException;
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4e4732d..f51dd06 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -20,6 +20,7 @@
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,8 @@
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
+            networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
@@ -55,8 +57,13 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
-                partition, nPartitions, networkAddress);
+        try {
+            ccs.getDatasetDirectoryService()
+                    .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
+                            networkAddress);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index ffae76a..d63bc8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,11 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+        try {
+            ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index 150875b..67b87f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -31,7 +31,6 @@
  * Sweeper to clean up the stale result distribution files and result states.
  */
 public class ResultStateSweeper implements Runnable {
-    private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
 
     private final IDatasetManager datasetManager;
 
@@ -39,12 +38,16 @@
 
     private final long resultSweepThreshold;
 
+    private final Logger logger;
+
     private final List<JobId> toBeCollected;
 
-    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
+            Logger logger) {
         this.datasetManager = datasetManager;
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
+        this.logger = logger;
         toBeCollected = new ArrayList<JobId>();
     }
 
@@ -56,11 +59,10 @@
                 Thread.sleep(resultSweepThreshold);
                 sweep();
             } catch (InterruptedException e) {
-                LOGGER.severe("Result cleaner thread interrupted, shutting down.");
+                logger.log(Level.SEVERE, "Result cleaner thread interrupted, shutting down.", e);
                 break; // the interrupt was explicit from another thread. This thread should shut down...
             }
         }
-
     }
 
     private void sweep() {
@@ -75,8 +77,8 @@
                 datasetManager.deinitState(jobId);
             }
         }
-        if (LOGGER.isLoggable(Level.FINER)) {
-            LOGGER.finer("Result state cleanup instance successfully completed.");
+        if (logger.isLoggable(Level.FINER)) {
+            logger.finer("Result state cleanup instance successfully completed.");
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
index 73c680f..3bc549e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
@@ -58,8 +58,12 @@
     public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
         List<Exception> newExceptions = new ArrayList<>();
         for (Exception e : exceptions) {
-            newExceptions.add(
-                    new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), e, nodeId));
+            if (e instanceof HyracksDataException) {
+                newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId));
+            } else {
+                newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(),
+                        e, nodeId));
+            }
         }
         exceptions.clear();
         exceptions.addAll(newExceptions);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a594f95..5fac823 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,10 +18,8 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
@@ -66,29 +64,22 @@
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
     }
 
     @Override
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
             boolean asyncMode, int partition, int nPartitions) throws HyracksException {
-        DatasetPartitionWriter dpw = null;
+        DatasetPartitionWriter dpw;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
                     datasetMemoryManager, fileFactory);
 
-            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
-            if (rsIdMap == null) {
-                rsIdMap = new ResultSetMap();
-                partitionResultStateMap.put(jobId, rsIdMap);
-            }
+            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.computeIfAbsent(jobId,
+                    k -> new ResultSetMap());
 
-            ResultState[] resultStates = rsIdMap.get(rsId);
-            if (resultStates == null) {
-                resultStates = new ResultState[nPartitions];
-                rsIdMap.put(rsId, resultStates);
-            }
+            ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
             resultStates[partition] = dpw.getResultState();
         }
 
@@ -141,7 +132,7 @@
                 throw new HyracksException("Unknown JobId " + jobId);
             }
 
-            ResultState[] resultStates = rsIdMap.get(resultSetId);
+            ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
             if (resultStates == null) {
                 throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
             }
@@ -161,49 +152,16 @@
     @Override
     public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
         ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
-        if (rsIdMap != null) {
-            ResultState[] resultStates = rsIdMap.get(resultSetId);
-            if (resultStates != null) {
-                ResultState state = resultStates[partition];
-                if (state != null) {
-                    state.closeAndDelete();
-                    LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
-                }
-                resultStates[partition] = null;
-                boolean stateEmpty = true;
-                for (int i = 0; i < resultStates.length; i++) {
-                    if (resultStates[i] != null) {
-                        stateEmpty = false;
-                        break;
-                    }
-                }
-                if (stateEmpty) {
-                    rsIdMap.remove(resultSetId);
-                }
-            }
-            if (rsIdMap.isEmpty()) {
-                partitionResultStateMap.remove(jobId);
-            }
+        if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) {
+            partitionResultStateMap.remove(jobId);
         }
     }
 
     @Override
     public synchronized void abortReader(JobId jobId) {
         ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
-
-        if (rsIdMap == null) {
-            return;
-        }
-
-        for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) {
-            ResultState[] resultStates = mapEntry.getValue();
-            if (resultStates != null) {
-                for (ResultState state : resultStates) {
-                    if (state != null) {
-                        state.abort();
-                    }
-                }
-            }
+        if (rsIdMap != null) {
+            rsIdMap.abortAll();
         }
     }
 
@@ -214,59 +172,33 @@
 
     @Override
     public synchronized void close() {
-        for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
-            deinit(entry.getKey());
+        for (JobId jobId : getJobIds()) {
+            deinit(jobId);
         }
         deallocatableRegistry.close();
     }
 
     @Override
-    public Set<JobId> getJobIds() {
+    public synchronized Set<JobId> getJobIds() {
         return partitionResultStateMap.keySet();
     }
 
     @Override
-    public IDatasetStateRecord getState(JobId jobId) {
+    public synchronized IDatasetStateRecord getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
     @Override
-    public void deinitState(JobId jobId) {
+    public synchronized void deinitState(JobId jobId) {
         deinit(jobId);
         partitionResultStateMap.remove(jobId);
     }
 
-    private void deinit(JobId jobId) {
+    private synchronized void deinit(JobId jobId) {
         ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
-            for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) {
-                ResultState[] resultStates = mapEntry.getValue();
-                if (resultStates != null) {
-                    for (int i = 0; i < resultStates.length; i++) {
-                        ResultState state = resultStates[i];
-                        if (state != null) {
-                            state.closeAndDelete();
-                            LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
-                        }
-                    }
-                }
-            }
+            rsIdMap.closeAndDeleteAll();
         }
     }
 
-    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
-        private static final long serialVersionUID = 1L;
-
-        long timestamp;
-
-        public ResultSetMap() {
-            super();
-            timestamp = System.currentTimeMillis();
-        }
-
-        @Override
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index e007050..f7aa2e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -86,10 +86,7 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (!partitionRegistered) {
-            registerResultPartitionLocation(false);
-            partitionRegistered = true;
-        }
+        registerResultPartitionLocation(false);
         if (datasetMemoryManager == null) {
             resultState.write(buffer);
         } else {
@@ -102,6 +99,7 @@
         try {
             resultState.closeAndDelete();
             resultState.abort();
+            registerResultPartitionLocation(false);
             manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
@@ -113,10 +111,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
-        if (!partitionRegistered) {
-            registerResultPartitionLocation(true);
-            partitionRegistered = true;
-        }
+        registerResultPartitionLocation(true);
         resultState.close();
         try {
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
@@ -127,7 +122,11 @@
 
     void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
         try {
-            manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, empty);
+            if (!partitionRegistered) {
+                manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult,
+                        empty);
+                partitionRegistered = true;
+            }
         } catch (HyracksException e) {
             if (e instanceof HyracksDataException) {
                 throw (HyracksDataException) e;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
new file mode 100644
index 0000000..579f68b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.dataset;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+class ResultSetMap implements IDatasetStateRecord, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
+    private final long timestamp;
+    private final HashMap<ResultSetId, ResultState[]> resultStateMap;
+
+    ResultSetMap() {
+        timestamp = System.currentTimeMillis();
+        resultStateMap = new HashMap<>();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    ResultState[] getResultStates(ResultSetId rsId) {
+        return resultStateMap.get(rsId);
+    }
+
+    ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) {
+        return resultStateMap.computeIfAbsent(rsId, (k) -> new ResultState[nPartitions]);
+    }
+
+    /**
+     * removes a result partition for a result set
+     *
+     * @param jobId
+     *            the id of the job that produced the result set
+     * @param resultSetId
+     *            the id of the result set
+     * @param partition
+     *            the partition number
+     * @return true, if all partitions for the resultSetId have been removed
+     */
+    boolean removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
+        final ResultState[] resultStates = resultStateMap.get(resultSetId);
+        if (resultStates != null) {
+            final ResultState state = resultStates[partition];
+            if (state != null) {
+                state.closeAndDelete();
+                LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
+            }
+            resultStates[partition] = null;
+            boolean stateEmpty = true;
+            for (ResultState resState : resultStates) {
+                if (resState != null) {
+                    stateEmpty = false;
+                    break;
+                }
+            }
+            if (stateEmpty) {
+                resultStateMap.remove(resultSetId);
+            }
+            return resultStateMap.isEmpty();
+        }
+        return true;
+    }
+
+    void abortAll() {
+        applyToAllStates((rsId, state, i) -> state.abort());
+    }
+
+    void closeAndDeleteAll() {
+        applyToAllStates((rsId, state, i) -> {
+            state.closeAndDelete();
+            LOGGER.fine("Removing partition: " + i + " for result set " + rsId);
+        });
+    }
+
+    @FunctionalInterface
+    private interface StateModifier {
+        void modify(ResultSetId rsId, ResultState entry, int partition);
+    }
+
+    private void applyToAllStates(StateModifier modifier) {
+        for (Map.Entry<ResultSetId, ResultState[]> entry : resultStateMap.entrySet()) {
+            final ResultSetId rsId = entry.getKey();
+            final ResultState[] resultStates = entry.getValue();
+            if (resultStates == null) {
+                continue;
+            }
+            for (int i = 0; i < resultStates.length; i++) {
+                final ResultState state = resultStates[i];
+                if (state != null) {
+                    modifier.modify(rsId, state, i);
+                }
+            }
+        }
+    }
+}