Merge branch gerrit/mad-hatter

Change-Id: I24af1b0da23f056277b10c6d9064813c564ab628
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index c2227a5..458008b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -72,6 +73,11 @@
     }
 
     public void registerRuntime(IActiveRuntime runtime) throws HyracksDataException {
+        NodeControllerService controllerService = (NodeControllerService) serviceCtx.getControllerService();
+        if (controllerService.getNodeStatus() != NodeStatus.ACTIVE) {
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.NODE_IS_NOT_ACTIVE,
+                    serviceCtx.getNodeId());
+        }
         if (shutdown) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN);
         }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index e01d0a7..3c2f8e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -34,7 +34,7 @@
     void notify(ActiveEvent event);
 
     /**
-     * Checkcs whether the subscriber is done receiving events
+     * Checks whether the subscriber is done receiving events
      *
      * @return
      */
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ca610aa..8338b2b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -75,7 +75,7 @@
      * refresh the stats
      *
      * @param timeout
-     * @throws HyracksDataException
+     * @throws HyracksDataException throws ASX3118 if active entity is not currently running
      */
     void refreshStats(long timeout) throws HyracksDataException;
 
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index b8c44a6..1a2af13 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -56,6 +56,6 @@
 
     @Override
     public String toString() {
-        return ActiveManagerMessage.class.getSimpleName();
+        return getClass().getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 0dbba52..117a68c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -29,7 +29,17 @@
         this.reqId = reqId;
     }
 
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+
     public long getReqId() {
         return reqId;
     }
+
+    @Override
+    public String toString() {
+        return "ActiveStatsRequestMessage{" + "reqId=" + reqId + '}';
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index e98e3f6..a7b91c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -107,7 +107,11 @@
             DataverseName dataverseName = ServletUtil.getDataverseName(request, "dataverseName");
             String datasetName = request.getParameter("datasetName");
             String nodes = request.getParameter("nodes");
-
+            boolean forceRebalance = true;
+            String force = request.getParameter("force");
+            if (force != null) {
+                forceRebalance = Boolean.parseBoolean(force);
+            }
             // Parses and check target nodes.
             if (nodes == null) {
                 sendResponse(response, HttpResponseStatus.BAD_REQUEST, "nodes are not given");
@@ -133,7 +137,8 @@
                 return;
             }
             // Schedules a rebalance task and wait for its completion.
-            CountDownLatch terminated = scheduleRebalance(dataverseName, datasetName, targetNodes, response);
+            CountDownLatch terminated =
+                    scheduleRebalance(dataverseName, datasetName, targetNodes, response, forceRebalance);
             terminated.await();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -156,10 +161,10 @@
 
     // Schedules a rebalance task.
     private synchronized CountDownLatch scheduleRebalance(DataverseName dataverseName, String datasetName,
-            String[] targetNodes, IServletResponse response) {
+            String[] targetNodes, IServletResponse response, boolean force) {
         CountDownLatch terminated = new CountDownLatch(1);
-        Future<Void> task =
-                executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated));
+        Future<Void> task = executor
+                .submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated, force));
         rebalanceTasks.add(task);
         rebalanceFutureTerminated.add(terminated);
         return terminated;
@@ -167,7 +172,7 @@
 
     // Performs the actual rebalance.
     private Void doRebalance(DataverseName dataverseName, String datasetName, String[] targetNodes,
-            IServletResponse response, CountDownLatch terminated) {
+            IServletResponse response, CountDownLatch terminated, boolean force) {
         try {
             // Sets the content type.
             HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, StandardCharsets.UTF_8);
@@ -179,11 +184,11 @@
                 for (Dataset dataset : datasets) {
                     // By the time rebalanceDataset(...) is called, the dataset could have been dropped.
                     // If that's the case, rebalanceDataset(...) would be a no-op.
-                    rebalanceDataset(dataset.getDataverseName(), dataset.getDatasetName(), targetNodes);
+                    rebalanceDataset(dataset.getDataverseName(), dataset.getDatasetName(), targetNodes, force);
                 }
             } else {
                 // Rebalances a given dataset from its current locations to the target nodes.
-                rebalanceDataset(dataverseName, datasetName, targetNodes);
+                rebalanceDataset(dataverseName, datasetName, targetNodes, force);
             }
 
             // Sends response.
@@ -243,7 +248,7 @@
     }
 
     // Rebalances a given dataset.
-    private void rebalanceDataset(DataverseName dataverseName, String datasetName, String[] targetNodes)
+    private void rebalanceDataset(DataverseName dataverseName, String datasetName, String[] targetNodes, boolean force)
             throws Exception {
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
         MetadataProvider metadataProvider = MetadataProvider.create(appCtx, null);
@@ -256,7 +261,7 @@
                 lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataverseName,
                         datasetName);
                 RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
-                        metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
+                        metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE, force);
             } finally {
                 activeNotificationHandler.resume(metadataProvider);
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 55022f9..276a04b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.active;
 
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -93,7 +95,7 @@
     protected ActivityState prevState;
     protected JobId jobId;
     protected volatile long statsTimestamp;
-    protected String stats;
+    protected volatile String stats;
     protected volatile boolean isFetchingStats;
     protected int numRegistered;
     protected int numDeRegistered;
@@ -292,12 +294,17 @@
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
         LOGGER.log(level, "refreshStats called");
+        // first check state & if we are fetching outside of the lock- in the event we are recovering it may take some
+        // time to obtain the lock...
+        ensureRunning();
+        if (isFetchingStats) {
+            LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
+            return;
+        }
         synchronized (this) {
-            if (state != ActivityState.RUNNING) {
-                LOGGER.log(level, "returning immediately since state = " + state);
-                notifySubscribers(statsUpdatedEvent);
-                return;
-            } else if (isFetchingStats) {
+            // now that we have the lock, again verify the state & ensure we are not already fetching new stats
+            ensureRunning();
+            if (isFetchingStats) {
                 LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
                 return;
             } else {
@@ -323,6 +330,12 @@
         isFetchingStats = false;
     }
 
+    protected void ensureRunning() throws RuntimeDataException {
+        if (state != ActivityState.RUNNING) {
+            throw new RuntimeDataException(ACTIVE_ENTITY_NOT_RUNNING, runtimeName, String.valueOf(state).toLowerCase());
+        }
+    }
+
     protected synchronized void notifySubscribers(ActiveEvent event) {
         notifyAll();
         Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f912c76..bccd2a9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -214,6 +214,7 @@
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.io.FileSplit;
@@ -281,6 +282,10 @@
         return sessionOutput;
     }
 
+    public IWarningCollector getWarningCollector() {
+        return warningCollector;
+    }
+
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         validateStatements(requestParameters);
@@ -1646,6 +1651,10 @@
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx.getValue(), dataverseName);
             if (dv == null) {
                 if (ifExists) {
+                    if (warningCollector.shouldWarn()) {
+                        warningCollector
+                                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.UNKNOWN_DATAVERSE, dataverseName));
+                    }
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return false;
                 } else {
@@ -1903,7 +1912,16 @@
             // Check if the dataverse exists
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
-                throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
+                if (stmtTypeDrop.getIfExists()) {
+                    if (warningCollector.shouldWarn()) {
+                        warningCollector
+                                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.UNKNOWN_DATAVERSE, dataverseName));
+                    }
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
+                }
             }
 
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 84f9a94..d32bfb2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -94,7 +94,7 @@
      */
     public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
+            IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception {
         Dataset sourceDataset;
         Dataset targetDataset;
         // Executes the first Metadata transaction.
@@ -113,8 +113,7 @@
 
             Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName()));
 
-            // The the source nodes are identical to the target nodes.
-            if (sourceNodes.equals(targetNcNames)) {
+            if (!forceRebalance && sourceNodes.equals(targetNcNames)) {
                 return;
             }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index dfe696f..06380fe 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -168,7 +168,7 @@
                 lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataverseName,
                         datasetName);
                 RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
-                        metadataProvider, ccAppCtx.getHcc(), NoOpDatasetRebalanceCallback.INSTANCE);
+                        metadataProvider, ccAppCtx.getHcc(), NoOpDatasetRebalanceCallback.INSTANCE, false);
             } finally {
                 activeNotificationHandler.resume(metadataProvider);
             }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index b23b3cf..963e227 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.asterix.test.active;
 
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+import static org.apache.asterix.common.exceptions.ErrorCode.ASTERIX;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -118,7 +121,13 @@
         Assert.assertTrue(requestedStats.contains("N/A"));
 
         // Update stats of not-started job
-        eventsListener.refreshStats(1000);
+        try {
+            eventsListener.refreshStats(1000);
+            Assert.fail("expected exception on refresh stats on not-started job");
+        } catch (HyracksDataException e) {
+            Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+                    e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+        }
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
         WaitForStateSubscriber startingSubscriber =
@@ -129,8 +138,12 @@
         startingSubscriber.sync();
         activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
         activeJobNotificationHandler.notifyJobStart(jobId);
-        eventsListener.refreshStats(1000);
-        requestedStats = eventsListener.getStats();
+        try {
+            eventsListener.refreshStats(1000);
+        } catch (HyracksDataException e) {
+            Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+                    e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+        }
         Assert.assertTrue(requestedStats.contains("N/A"));
         // Fake partition message and notify eventListener
         ActivePartitionMessage partitionMessage =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 2399064..cd0dee2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1006,11 +1006,14 @@
         File expectedResultFile;
         switch (ctx.getType()) {
             case "ddl":
+                ExtractedResult ddlExtractedResult;
                 if (ctx.getFile().getName().endsWith("aql")) {
-                    executeAqlUpdateOrDdl(statement, OutputFormat.CLEAN_JSON);
+                    ddlExtractedResult = executeAqlUpdateOrDdl(statement, OutputFormat.CLEAN_JSON);
                 } else {
-                    executeSqlppUpdateOrDdl(statement, OutputFormat.CLEAN_JSON);
+                    ddlExtractedResult = executeSqlppUpdateOrDdl(statement, OutputFormat.CLEAN_JSON);
                 }
+
+                validateWarning(ddlExtractedResult, testCaseCtx, cUnit, testFile, expectedWarnings);
                 break;
             case "update":
                 // isDmlRecoveryTest: set IP address
@@ -1056,11 +1059,7 @@
                         variableCtx, ctx, expectedResultFile, actualResultFile, queryCount,
                         expectedResultFileCtxs.size(), cUnit.getParameter(), ComparisonEnum.TEXT);
 
-                if (testCaseCtx.getTestCase().isCheckWarnings()) {
-                    boolean expectedSourceLoc = testCaseCtx.isSourceLocationExpected(cUnit);
-                    validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), expectedWarnings,
-                            expectedSourceLoc);
-                }
+                validateWarning(extractedResult, testCaseCtx, cUnit, testFile, expectedWarnings);
                 break;
             case "store":
                 // This is a query that returns the expected output of a subsequent query
@@ -1606,18 +1605,19 @@
         }
     }
 
-    public InputStream executeSqlppUpdateOrDdl(String statement, OutputFormat outputFormat) throws Exception {
+    public ExtractedResult executeSqlppUpdateOrDdl(String statement, OutputFormat outputFormat) throws Exception {
         return executeUpdateOrDdl(statement, outputFormat, getQueryServiceUri(SQLPP));
     }
 
-    private InputStream executeAqlUpdateOrDdl(String statement, OutputFormat outputFormat) throws Exception {
+    private ExtractedResult executeAqlUpdateOrDdl(String statement, OutputFormat outputFormat) throws Exception {
         return executeUpdateOrDdl(statement, outputFormat, getQueryServiceUri(AQL));
     }
 
-    private InputStream executeUpdateOrDdl(String statement, OutputFormat outputFormat, URI serviceUri)
+    private ExtractedResult executeUpdateOrDdl(String statement, OutputFormat outputFormat, URI serviceUri)
             throws Exception {
-        InputStream resultStream = executeQueryService(statement, serviceUri, outputFormat, UTF_8);
-        return ResultExtractor.extract(resultStream, UTF_8, outputFormat).getResult();
+        try (InputStream resultStream = executeQueryService(statement, serviceUri, outputFormat, UTF_8)) {
+            return ResultExtractor.extract(resultStream, UTF_8, outputFormat);
+        }
     }
 
     protected static boolean isExpected(Exception e, CompilationUnit cUnit) {
@@ -2420,8 +2420,17 @@
         return extension.endsWith(AQL) ? getEndpoint(Servlets.QUERY_AQL) : getEndpoint(Servlets.QUERY_SERVICE);
     }
 
-    private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, BitSet expectedWarnings,
-            boolean expectedSourceLoc) throws Exception {
+    protected void validateWarning(ExtractedResult result, TestCaseContext testCaseCtx, CompilationUnit cUnit,
+            File testFile, BitSet expectedWarnings) throws Exception {
+        if (testCaseCtx.getTestCase().isCheckWarnings()) {
+            boolean expectedSourceLoc = testCaseCtx.isSourceLocationExpected(cUnit);
+            validateWarnings(result.getWarnings(), cUnit.getExpectedWarn(), expectedWarnings, expectedSourceLoc,
+                    testFile);
+        }
+    }
+
+    protected void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, BitSet expectedWarnings,
+            boolean expectedSourceLoc, File testFile) throws Exception {
         if (actualWarnings != null) {
             for (String actualWarn : actualWarnings) {
                 OptionalInt first = IntStream.range(0, expectedWarn.size())
@@ -2444,6 +2453,7 @@
                 }
                 int warningIndex = first.getAsInt();
                 expectedWarnings.clear(warningIndex);
+                LOGGER.info("testFile {} issued an (expected) warning", testFile);
             }
         }
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
index 3a4823f..c4fca1a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -58,7 +58,7 @@
     public void resultStreamingFailureTest() throws Exception {
         queryAndDropConnection();
         // allow result sender to terminate and ensure no leaks
-        Span timeout = Span.start(5, TimeUnit.SECONDS);
+        Span timeout = Span.start(30, TimeUnit.SECONDS);
         while (!timeout.elapsed()) {
             String threadDump = ThreadDumpUtil.takeDumpString();
             if (!threadDump.contains(ResultPartitionReader.class.getName())) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
index 334dd52..f521dd9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
@@ -19,5 +19,15 @@
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
   <test-group name="failed">
+    <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-primary-scan">
+        <output-dir compare="Text">push-limit-to-primary-scan</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-primary-scan-select">
+        <output-dir compare="Text">push-limit-to-primary-scan-select</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.000.ddl.sqlpp
new file mode 100644
index 0000000..16f4604
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.000.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+create dataverse realDataverse;
+create type realDataverse.realType as open { id: uuid };
+create dataset realDataverse.realDataset1(realDataverse.realType) primary key id autogenerated;
+create dataset realDataverse.realDataset2(realDataverse.realType) primary key id autogenerated;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.001.ddl.sqlpp
new file mode 100644
index 0000000..6d9fcee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.001.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop type fakeDataverse.fakeType; // fails, error dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.002.ddl.sqlpp
new file mode 100644
index 0000000..e864dd6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.002.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop type fakeDataverse.realType; // fails, error dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.003.ddl.sqlpp
new file mode 100644
index 0000000..1de1496
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.003.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop type fakeDataverse.realType if exists; // succeeds, warn dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.004.ddl.sqlpp
new file mode 100644
index 0000000..0d1de03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.004.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset fakeDataverse.fakeDataset1; // fails, error dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.005.ddl.sqlpp
new file mode 100644
index 0000000..5abfc5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.005.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset fakeDataverse.realDataset1; // fails, error dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.006.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.006.ddl.sqlpp
new file mode 100644
index 0000000..6c7670e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.006.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset fakeDataverse.fakeDataset1 if exists; // succeeds, warn dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.007.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.007.ddl.sqlpp
new file mode 100644
index 0000000..1a07955
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.007.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset fakeDataverse.realDataset1 if exists; // succeeds, warn dataverse not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.008.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.008.ddl.sqlpp
new file mode 100644
index 0000000..6744022
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.008.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset realDataverse.fakeDataset1; // fails, error dataset not found
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.009.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.009.ddl.sqlpp
new file mode 100644
index 0000000..d807d1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.009.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset realDataverse.realDataset1; // succeeds
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.010.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.010.ddl.sqlpp
new file mode 100644
index 0000000..228c6c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.010.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset realDataverse.fakeDataset2 if exists; // succeeds, no warning
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.011.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.011.ddl.sqlpp
new file mode 100644
index 0000000..e5be8cb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.011.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataset realDataverse.realDataset2 if exists; // succeeds
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.999.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.999.ddl.sqlpp
new file mode 100644
index 0000000..304387f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/drop_dataset_invalid_dataverse/test.999.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param max-warnings:json=1000
+
+drop dataverse realDataverse if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.001.ddl.sqlpp
index 0fb09b8..67ff279 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.001.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.001.ddl.sqlpp
@@ -17,5 +17,7 @@
  * under the License.
  */
 
-drop type fakeDataverse.myType if exists;
+-- param max-warnings:json=1000
+
+drop type fakeDataverse.myType if exists; // success, issue warning
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.002.ddl.sqlpp
index c6715e4..0bd6b85 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.002.ddl.sqlpp
@@ -17,10 +17,12 @@
  * under the License.
  */
 
+-- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
 
-drop type realDataverse.myType if exists;
-create type fakeDataverse.myType as open { id: uuid, f1: int };
+drop type realDataverse.myType if exists; // success, no warning
+create type fakeDataverse.myType as open { id: uuid, f1: int }; // fails, error dataverse not found
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.003.ddl.sqlpp
index 0bb88db..21127f8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.003.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.003.ddl.sqlpp
@@ -17,12 +17,13 @@
  * under the License.
  */
 
+-- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
 
-drop type realDataverse.myType if exists;
+drop type realDataverse.myType if exists; // success, no warning
 create type realDataverse.myType as open { id: uuid, f1: int };
 
-drop dataset fakeDataverse.myDataset /*if exists*/;
-
+drop dataset fakeDataverse.myDataset if exists; // success, issue warning
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.004.ddl.sqlpp
index 2e33fc3..4a820e5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.004.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.004.ddl.sqlpp
@@ -17,13 +17,15 @@
  * under the License.
  */
 
+-- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
 
-drop type realDataverse.myType if exists;
+drop type realDataverse.myType if exists; // success, no warning
 create type realDataverse.myType as open { id: uuid, f1: int };
 
-drop dataset realDataverse.myDataset if exists;
-create dataset fakeDataverse.myDataset(fakeDataverse.myType) primary key id autogenerated;
+drop dataset realDataverse.myDataset if exists; // success
+create dataset fakeDataverse.myDataset(fakeDataverse.myType) primary key id autogenerated; // fail, fake dataverse not found
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.005.ddl.sqlpp
index 9723237..24aad2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.005.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.005.ddl.sqlpp
@@ -17,13 +17,15 @@
  * under the License.
  */
 
+-- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
 
-drop type realDataverse.myType if exists;
+drop type realDataverse.myType if exists; // success, no warning
 create type realDataverse.myType as open { id: uuid, f1: int };
 
-drop dataset realDataverse.myDataset if exists;
-create dataset realDataverse.myDataset(fakeDataverse.myType) primary key id autogenerated;
+drop dataset realDataverse.myDataset if exists; // success, no warning
+create dataset realDataverse.myDataset(fakeDataverse.myType) primary key id autogenerated; // fail, fakeDataverse not found
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.006.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.006.ddl.sqlpp
index 9c52c7b..19f55b5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.006.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.006.ddl.sqlpp
@@ -17,13 +17,15 @@
  * under the License.
  */
 
+-- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
 
-drop type realDataverse.myType if exists;
+drop type realDataverse.myType if exists; // success
 create type realDataverse.myType as open { id: uuid, f1: int };
 
-drop dataset realDataverse.myDataset if exists;
-create dataset fakeDataverse.myDataset(realDataverse.myType) primary key id autogenerated;
+drop dataset realDataverse.myDataset if exists; // success
+create dataset fakeDataverse.myDataset(realDataverse.myType) primary key id autogenerated; // fail, fakeDataverse not found
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.007.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.007.ddl.sqlpp
index 56f08fd..89902f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.007.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.007.ddl.sqlpp
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+ -- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.008.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.008.ddl.sqlpp
index 9375963..2c9da5e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.008.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/invalid-dataverse/invalid-dataverse.008.ddl.sqlpp
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+ -- param max-warnings:json=1000
+
 drop dataverse realDataverse if exists;
 create dataverse realDataverse;
 use realDataverse;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.01.ddl.sqlpp
new file mode 100644
index 0000000..fbe4221
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.01.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE openType AS {id: string};
+CREATE DATASET ds1(openType) PRIMARY KEY id;
+CREATE DATASET ds2(openType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.02.update.sqlpp
new file mode 100644
index 0000000..2a193cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.02.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ds1 {"id": "1", "f": 3};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.03.query.sqlpp
new file mode 100644
index 0000000..4a9afc2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.03.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 LEFT OUTER JOIN ds2 ON ds1.f > ds2.f
+SELECT ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.04.query.sqlpp
new file mode 100644
index 0000000..954341c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.04.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds2 LEFT OUTER JOIN ds1 ON ds2.f > ds1.f
+SELECT ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.05.query.sqlpp
new file mode 100644
index 0000000..e5ba174
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.05.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 JOIN ds2 ON ds1.f > ds2.f
+SELECT ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.06.query.sqlpp
new file mode 100644
index 0000000..9e6c335
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/empty-dataset/empty-dataset.06.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds2 JOIN ds1 ON ds2.f > ds1.f
+SELECT ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp
index 917715e..319db54 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp
@@ -42,6 +42,28 @@
   tweetid : string
 };
 
+create type onekType1 as
+ closed {
+  unique1 : bigint,
+  unique2 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
 create dataset DBLP1(DBLPType) primary key id;
 
-create  dataset TweetMessages(TweetMessageType) primary key tweetid;
\ No newline at end of file
+create  dataset TweetMessages(TweetMessageType) primary key tweetid;
+
+create dataset onek1(onekType1) primary key unique2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.query.sqlpp
new file mode 100644
index 0000000..a91cece
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.query.sqlpp
@@ -0,0 +1,46 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Check that data-scan operator isomorphism comparison
+ * considers pushed condition.
+ *
+ * The dataset contains 100 tuples, 50 with two=0 and 50 with two=1.
+ * The expected result is 50.
+ */
+
+use test;
+
+with T1 as (
+  select two from onek1
+  where two between 1 and 10
+  limit 1000
+),
+T2 as (
+  select two from onek1
+  where two between -10 and -1
+  limit 1000
+)
+
+select value count(*) from (
+  select two from T1
+  union all
+  select two from T2
+) t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.query.sqlpp
new file mode 100644
index 0000000..34eb190
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.query.sqlpp
@@ -0,0 +1,45 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Check that data-scan operator isomorphism comparison
+ * considers pushed condition
+ */
+
+use test;
+
+explain
+
+with T1 as (
+  select two from onek1
+  where two between 1 and 10
+  limit 1000
+),
+T2 as (
+  select two from onek1
+  where two between -10 and -1
+  limit 1000
+)
+
+select value count(*) from (
+  select two from T1
+  union all
+  select two from T2
+) t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp
index a18f3e2..cd3d45f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/* scan and print a delimited text file */
 
 use test;
 
-
 load  dataset DBLP1 using localfs ((`path`=`asterix_nc1://data/dblp-small/dblp-small-id.txt`),(`format`=`delimited-text`),(`delimiter`=`:`));
 
-load  dataset TweetMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/twm.adm`),(`format`=`adm`));
\ No newline at end of file
+load  dataset TweetMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/twm.adm`),(`format`=`adm`));
+
+load dataset onek1 using localfs ((`path`=`asterix_nc1://data/wisc/onektup.adm`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp
index 162cc35..36af900 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp
@@ -38,5 +38,26 @@
   misc : string
 };
 
+create type onekType1 as
+ closed {
+  unique1 : bigint,
+  unique2 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
 create  dataset DBLP1(DBLPType) primary key id;
 
+create dataset onek1(onekType1) primary key unique2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp
index 2e0f056..4d4065d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp
@@ -20,5 +20,6 @@
 
 use test;
 
-
 load  dataset DBLP1 using localfs ((`path`=`asterix_nc1://data/dblp-small/dblp-small-id.txt`),(`format`=`delimited-text`),(`delimiter`=`:`));
+
+load dataset onek1 using localfs ((`path`=`asterix_nc1://data/wisc/onektup.adm`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.query.sqlpp
new file mode 100644
index 0000000..dd9eb15
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.query.sqlpp
@@ -0,0 +1,45 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Check that data-scan operator isomorphism comparison
+ * considers pushed limit.
+ *
+ * The dataset contains 100 tuples,
+ * so the expected result is 110.
+ */
+
+use test;
+
+with
+T1 as (
+  select two from onek1
+  limit 100
+),
+T2 as (
+  select two from onek1
+  limit 10
+)
+
+select value count(*) from (
+  select two from T1
+  union all
+  select two from T2
+) t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.query.sqlpp
new file mode 100644
index 0000000..a592faa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.query.sqlpp
@@ -0,0 +1,44 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Check that data-scan operator isomorphism comparison
+ * considers pushed limit.
+ */
+
+use test;
+
+explain
+
+with
+T1 as (
+  select two from onek1
+  limit 100
+),
+T2 as (
+  select two from onek1
+  limit 10
+)
+
+select value count(*) from (
+  select two from T1
+  union all
+  select two from T2
+) t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/identical_location/identical_location.3.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/identical_location/identical_location.3.post.http
index e179282..d5427da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/identical_location/identical_location.3.post.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/identical_location/identical_location.3.post.http
@@ -17,4 +17,4 @@
  * under the License.
  */
 
-/admin/rebalance?dataverseName=tpch&datasetName=LineItem&nodes=asterix_nc2%2Casterix_nc1
+/admin/rebalance?dataverseName=tpch&datasetName=LineItem&nodes=asterix_nc2%2Casterix_nc1&force=false
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.03.adm
new file mode 100644
index 0000000..2cddb05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.03.adm
@@ -0,0 +1 @@
+{ "ds1": { "id": "1", "f": 3 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.04.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.04.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.05.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.05.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.06.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/empty-dataset/empty-dataset.06.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.adm
new file mode 100644
index 0000000..c5b431b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.10.adm
@@ -0,0 +1 @@
+50
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.adm
new file mode 100644
index 0000000..85cf5c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.11.adm
@@ -0,0 +1,52 @@
+distribute result [$$202]
+-- DISTRIBUTE_RESULT  |LOCAL|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+    aggregate [$$202] <- [agg-sql-sum($$235)]
+    -- AGGREGATE  |LOCAL|
+      aggregate [$$235] <- [agg-sql-count(1)]
+      -- AGGREGATE  |LOCAL|
+        exchange
+        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+          union
+          -- UNION_ALL  |UNPARTITIONED|
+            exchange
+            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+              limit 1000
+              -- STREAM_LIMIT  |UNPARTITIONED|
+                project ([])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange
+                  -- SORT_MERGE_EXCHANGE [$$134(ASC) ]  |PARTITIONED|
+                    limit 1000
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$134])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$134, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 10))) limit 1000
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            exchange
+            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+              limit 1000
+              -- STREAM_LIMIT  |UNPARTITIONED|
+                project ([])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange
+                  -- SORT_MERGE_EXCHANGE [$$135(ASC) ]  |PARTITIONED|
+                    limit 1000
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$135])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$135, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), -10), le($$onek1.getField(2), -1))) limit 1000
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.adm
new file mode 100644
index 0000000..97e3504
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.6.adm
@@ -0,0 +1 @@
+110
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.adm
new file mode 100644
index 0000000..f830e9b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.7.adm
@@ -0,0 +1,52 @@
+distribute result [$$180]
+-- DISTRIBUTE_RESULT  |LOCAL|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+    aggregate [$$180] <- [agg-sql-sum($$209)]
+    -- AGGREGATE  |LOCAL|
+      aggregate [$$209] <- [agg-sql-count(1)]
+      -- AGGREGATE  |LOCAL|
+        exchange
+        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+          union
+          -- UNION_ALL  |UNPARTITIONED|
+            exchange
+            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+              limit 100
+              -- STREAM_LIMIT  |UNPARTITIONED|
+                project ([])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange
+                  -- SORT_MERGE_EXCHANGE [$$120(ASC) ]  |PARTITIONED|
+                    limit 100
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$120])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$120, $$onek1] <- test.onek1 limit 100
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            exchange
+            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+              limit 10
+              -- STREAM_LIMIT  |UNPARTITIONED|
+                project ([])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange
+                  -- SORT_MERGE_EXCHANGE [$$121(ASC) ]  |PARTITIONED|
+                    limit 10
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$121])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$121, $$onek1] <- test.onek1 limit 10
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index 3a4f03e..87ba14d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -102,6 +102,8 @@
     <test-case FilePath="external-dataset" check-warnings="true">
       <compilation-unit name="aws/s3/no-files-returned/definition-points-to-nothing">
         <output-dir compare="Text">aws/s3/no-files-returned/definition-points-to-nothing</output-dir>
+        <source-location>false</source-location>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
         <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
       </compilation-unit>
     </test-case>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 883a9e3..246f2a3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3997,17 +3997,17 @@
         <expected-error>Cannot drop index "ds". Drop dataset "ds" to remove this index</expected-error>
       </compilation-unit>
     </test-case>
-    <test-case FilePath="ddl">
+    <test-case FilePath="ddl" check-warnings="true">
       <compilation-unit name="invalid-dataverse">
         <output-dir compare="Text">invalid-dataverse</output-dir>
         <source-location>false</source-location>
-        <expected-error>Cannot find dataverse with name fakeDataverse (in line 20, at column 1)</expected-error>
-        <expected-error>Cannot find dataverse with name fakeDataverse (in line 25, at column 1)</expected-error>
+        <expected-warn>Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-warn>
         <expected-error>Cannot find dataverse with name fakeDataverse (in line 27, at column 1)</expected-error>
-        <expected-error>Cannot find dataverse with name fakeDataverse (in line 28, at column 1)</expected-error>
-        <expected-error>Cannot find datatype with name fakeDataverse.myType</expected-error>
-        <expected-error>Cannot find dataverse with name fakeDataverse (in line 28, at column 1)</expected-error>
+        <expected-warn>Cannot find dataverse with name fakeDataverse (in line 29, at column 1)</expected-warn>
         <expected-error>Cannot find dataverse with name fakeDataverse (in line 30, at column 1)</expected-error>
+        <expected-error>Cannot find datatype with name fakeDataverse.myType</expected-error>
+        <expected-error>Cannot find dataverse with name fakeDataverse (in line 30, at column 1)</expected-error>
+        <expected-error>Cannot find dataverse with name fakeDataverse (in line 32, at column 1)</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="ddl">
@@ -4015,6 +4015,19 @@
         <output-dir compare="Text">dataset-and-index-same-dataverse</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="ddl" check-warnings="true">
+      <compilation-unit name="drop_dataset_invalid_dataverse">
+        <output-dir compare="Text">drop_dataset_invalid_dataverse</output-dir>
+        <expected-error>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-error>
+        <expected-error>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-error>
+        <expected-error>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-error>
+        <expected-error>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-error>
+        <expected-error>ASX1050: Cannot find dataset with name fakeDataset1 in dataverse realDataverse (in line 22, at column 1)</expected-error>
+        <expected-warn>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-warn>
+        <expected-warn>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-warn>
+        <expected-warn>ASX1063: Cannot find dataverse with name fakeDataverse (in line 22, at column 1)</expected-warn>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="ddl">
       <compilation-unit name="create_dataset_with_filter_on_meta">
         <output-dir compare="Text">create_dataset_with_filter_on_meta</output-dir>
@@ -12729,6 +12742,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="leftouterjoin">
+      <compilation-unit name="empty-dataset">
+        <output-dir compare="Text">empty-dataset</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="leftouterjoin">
       <compilation-unit name="query-ASTERIXDB-769">
         <output-dir compare="Text">query-ASTERIXDB-769</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index df28148..431d5b3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -330,6 +330,7 @@
     public static final int FAILED_TO_PARSE_METADATA = 3115;
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
+    public static final int ACTIVE_ENTITY_NOT_RUNNING = 3118;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 359054e..1f42771 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -21,6 +21,7 @@
 import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IFormattedException;
 
 public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
@@ -85,4 +86,18 @@
         }
         return test.test(e);
     }
+
+    /**
+     * Unwraps enclosed exceptions until a non-product exception is found, otherwise returns the root production
+     * exception
+     */
+    public static Throwable unwrap(Throwable e) {
+        Throwable current = e;
+        Throwable cause = e.getCause();
+        while (cause != null && cause != current && current instanceof IFormattedException) {
+            current = cause;
+            cause = current.getCause();
+        }
+        return current;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index e48a0d7..127ec04 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -330,6 +330,7 @@
 3115 = Failed to parse record metadata
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
+3118 = Active Entity %1$s is not running (it is %2$s)
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 37b157e..726880d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.feed.watch;
 
+import java.util.Objects;
+
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.hyracks.util.Span;
@@ -25,11 +27,19 @@
 public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
 
     protected final IActiveEntityEventsListener listener;
+    private final Object lockObject;
     private volatile boolean done = false;
     private volatile Exception failure = null;
 
+    public AbstractSubscriber(IActiveEntityEventsListener listener, Object lockObject) {
+        Objects.requireNonNull(lockObject);
+        this.listener = listener;
+        this.lockObject = lockObject;
+    }
+
     public AbstractSubscriber(IActiveEntityEventsListener listener) {
         this.listener = listener;
+        this.lockObject = this;
     }
 
     @Override
@@ -38,28 +48,28 @@
     }
 
     public void complete(Exception failure) {
-        synchronized (listener) {
+        synchronized (lockObject) {
             if (failure != null) {
                 this.failure = failure;
             }
             done = true;
-            listener.notifyAll();
+            lockObject.notifyAll();
         }
     }
 
     @Override
     public void sync() throws InterruptedException {
-        synchronized (listener) {
+        synchronized (lockObject) {
             while (!done) {
-                listener.wait();
+                lockObject.wait();
             }
         }
     }
 
     public boolean sync(Span span) throws InterruptedException {
-        synchronized (listener) {
+        synchronized (lockObject) {
             while (!done) {
-                span.wait(listener);
+                span.wait(lockObject);
                 if (done || span.elapsed()) {
                     return done;
                 }
@@ -71,4 +81,11 @@
     public Exception getFailure() {
         return failure;
     }
+
+    protected void reset() {
+        synchronized (lockObject) {
+            done = false;
+            failure = null;
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 818d826..4dc86ac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -29,7 +29,7 @@
     private final Set<ActivityState> targetStates;
 
     public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) {
-        super(listener);
+        super(listener, listener);
         this.targetStates = targetStates;
         listener.subscribe(this);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 7b3f53a..e50ea65 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -460,12 +460,17 @@
             return Boolean.FALSE;
         }
         DataSourceScanOperator argScan = (DataSourceScanOperator) arg;
-        if (!argScan.getDataSource().toString().equals(op.getDataSource().toString())) {
+        boolean isomorphic = op.getDataSource().getId().equals(argScan.getDataSource().getId())
+                && op.getOutputLimit() == argScan.getOutputLimit();
+        if (!isomorphic) {
             return Boolean.FALSE;
         }
         DataSourceScanOperator scanOpArg = (DataSourceScanOperator) copyAndSubstituteVar(op, arg);
-        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), scanOpArg.getVariables())
-                && op.getDataSource().toString().equals(scanOpArg.getDataSource().toString());
+        ILogicalExpression opCondition = op.getSelectCondition() != null ? op.getSelectCondition().getValue() : null;
+        ILogicalExpression argCondition =
+                scanOpArg.getSelectCondition() != null ? scanOpArg.getSelectCondition().getValue() : null;
+        isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), scanOpArg.getVariables())
+                && Objects.equals(opCondition, argCondition);
         return isomorphic;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index b7693c7..a5ad500 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -120,9 +120,15 @@
             RunFileReader runFileReader = runFileWriter.createReader();
             try {
                 runFileReader.open();
-                while (runFileReader.nextFrame(innerBuffer)) {
+                if (runFileReader.nextFrame(innerBuffer)) {
+                    do {
+                        for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                            blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                        }
+                    } while (runFileReader.nextFrame(innerBuffer));
+                } else if (isLeftOuter) {
                     for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                        blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                        appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
                     }
                 }
             } finally {
@@ -185,6 +191,18 @@
         FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2);
     }
 
+    private void appendMissing(BufferInfo outerBufferInfo, IFrameWriter writer) throws HyracksDataException {
+        accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength());
+        int tupleCount = accessorOuter.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
+            final byte[] ntByteArray = missingTupleBuilder.getByteArray();
+            final int ntSize = missingTupleBuilder.getSize();
+            FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0,
+                    ntSize);
+        }
+    }
+
     public void closeCache() throws HyracksDataException {
         if (runFileWriter != null) {
             runFileWriter.close();
@@ -195,9 +213,15 @@
         RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
         try {
             runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
+            if (runFileReader.nextFrame(innerBuffer)) {
+                do {
+                    for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                        blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                    }
+                } while (runFileReader.nextFrame(innerBuffer));
+            } else if (isLeftOuter) {
                 for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                    appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
                 }
             }
         } finally {