[ASTERIXDB-2845][COMP] Fix internal error in SubplanFlatteningUtil

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- SubplanFlatteningUtil should handle case when it cannot find
  an aggregate operator in the immediate subplan its rewriting
- Add an option to RemoveRedundantListifyRule to disable
  elimination of aggregate-unnest pair at the top of a subplan
- Add prerequisite check to RemoveRedundantListifyRule
  for aggregate-unnest case. The input to the unnest must have
  cardinality 1

Change-Id: I2448c038e765b97d41cd0cf66b8c4f159491d9c2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10563
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index d2fe2f5..3cd3e61 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -221,7 +221,7 @@
 
         condPushDownAndJoinInference.add(new PushSelectDownRule());
         condPushDownAndJoinInference.add(new PushSortDownRule());
-        condPushDownAndJoinInference.add(new RemoveRedundantListifyRule());
+        condPushDownAndJoinInference.add(new RemoveRedundantListifyRule(false));
         condPushDownAndJoinInference.add(new CancelUnnestWithNestedListifyRule());
         condPushDownAndJoinInference.add(new SimpleUnnestToProductRule());
         condPushDownAndJoinInference.add(new ComplexUnnestToProductRule());
@@ -298,7 +298,7 @@
         consolidation.add(new RemoveRedundantGroupByDecorVarsRule());
         //PushUnnestThroughUnion => RemoveRedundantListifyRule cause these rules are correlated
         consolidation.add(new AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag.UNNEST));
-        consolidation.add(new RemoveRedundantListifyRule());
+        consolidation.add(new RemoveRedundantListifyRule(true));
         // Window operator consolidation rules
         consolidation.add(new AsterixConsolidateWindowOperatorsRule());
         consolidation.add(new ReuseWindowAggregateRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
index 21990c9..f968b35 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
@@ -23,7 +23,6 @@
 import java.util.List;
 import java.util.Set;
 
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -45,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.CardinalityInferenceVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -52,36 +52,53 @@
 /**
  * The rule cancels out redundant pairs of operators unnest-listify aggregate
  * <p>
- * <ul>
- * <li>case 1 (direct):
+ * Case 1 (direct):
  * <p>
  * Before plan:
- * <ul>
- * <li>unnest $x [[ at $p ]] <- function-call:scan-collection($y)
- * <li>aggregate $y <- function-call:listify($z)
- * </ul>
- * <p>
+ * <pre>
+ * unnest $x [[ at $p ]] <- function-call:scan-collection($y)
+ * aggregate $y <- function-call:listify($z)
+ * </pre>
  * After plan:
- * <ul>
- * <li>[[ runningaggregate $p <- tid]]
- * <li>assign $x <- $z
- * </ul>
- * <li>case 2 (reverse):
+ * <pre>
+ * [[ runningaggregate $p <- tid]]
+ * assign $x <- $z
+ * </pre>
+ *
+ * Case 2 (reverse):
  * <p>
  * Before plan:
- * <ul>
- * <li>aggregate $x <- function-call:listify($y)
- * <li>unnest $y <- function-call:scan-collection($z)
- * </ul>
- * <p>
+ * <pre>
+ * aggregate $x <- function-call:listify($y)
+ * unnest $y <- function-call:scan-collection($z)
+ * </pre>
  * After plan:
- * <ul>
- * <li>assign $x <- $z
- * </ul>
- * </ul>
+ * <pre>
+ * assign $x <- $z
+ * </pre>
+ *
+ * Notes regarding Case 2(reverse):
+ * <ol>
+ * <li> Case 2 rewriting is only considered if unnest's input operator has cardinality "exactly 1".
+ *      If unnest's input operator produces 0 tuples then the aggregate operator would produce 1 tuple with $x = [],
+ *      while the assign rewriting would produce 0 tuples. Therefore it's not equivalent.
+ *      If unnest's input operator produces N tuples (where N > 1) then the aggregate operator would produce 1 tuple
+ *      with a concatenated list of all unnested values from all input tuples,
+ *      while the assign rewriting would produce N tuples. Therefore it's not equivalent.
+ *
+ * <li> It's configurable whether Case 2 rewriting is attempted or not if the 'aggregate' operator
+ *      is the root of a subplan's nested plan. The reason for allowing disabling Case 2 is because
+ *      aggregate-unnest elimination may prevent further subplan inlining rewritings.
+ * </ol>
  */
-
 public class RemoveRedundantListifyRule implements IAlgebraicRewriteRule {
+
+    private final boolean allowReverseCaseAtSubplanRoot;
+
+    public RemoveRedundantListifyRule(boolean allowReverseCaseAtSubplanRoot) {
+        this.allowReverseCaseAtSubplanRoot = allowReverseCaseAtSubplanRoot;
+    }
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
@@ -95,25 +112,29 @@
         if (context.checkIfInDontApplySet(this, op)) {
             return false;
         }
-        Set<LogicalVariable> varSet = new HashSet<LogicalVariable>();
-        return applyRuleDown(opRef, varSet, context);
+        Set<LogicalVariable> varSet = new HashSet<>();
+        return applyRuleDown(opRef, false, varSet, context);
     }
 
-    private boolean applyRuleDown(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varSet,
+    private boolean applyRuleDown(Mutable<ILogicalOperator> opRef, boolean isSubplanRoot, Set<LogicalVariable> varSet,
             IOptimizationContext context) throws AlgebricksException {
         boolean changed = applies(opRef, varSet, context);
-        changed |= appliesForReverseCase(opRef, varSet, context);
+        boolean skipReverseCase = isSubplanRoot && !allowReverseCaseAtSubplanRoot;
+        if (!skipReverseCase) {
+            changed |= appliesForReverseCase(opRef, varSet, context);
+        }
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         VariableUtilities.getUsedVariables(op, varSet);
         if (op.hasNestedPlans()) {
+            boolean isSubplanOp = op.getOperatorTag() == LogicalOperatorTag.SUBPLAN;
             // Variables used by the parent operators should be live at op.
-            Set<LogicalVariable> localLiveVars = new ListSet<LogicalVariable>();
+            Set<LogicalVariable> localLiveVars = new ListSet<>();
             VariableUtilities.getLiveVariables(op, localLiveVars);
             varSet.retainAll(localLiveVars);
             AbstractOperatorWithNestedPlans aonp = (AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : aonp.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                    if (applyRuleDown(r, varSet, context)) {
+                    if (applyRuleDown(r, isSubplanOp, varSet, context)) {
                         changed = true;
                     }
                     context.addToDontApplySet(this, r.getValue());
@@ -121,7 +142,7 @@
             }
         }
         for (Mutable<ILogicalOperator> i : op.getInputs()) {
-            if (applyRuleDown(i, varSet, context)) {
+            if (applyRuleDown(i, false, varSet, context)) {
                 changed = true;
             }
             context.addToDontApplySet(this, i.getValue());
@@ -205,7 +226,7 @@
         List<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<>(1);
         VariableReferenceExpression paramVarRef = new VariableReferenceExpression(paramVar);
         paramVarRef.setSourceLocation(arg0.getSourceLocation());
-        assgnExprs.add(new MutableObject<ILogicalExpression>(paramVarRef));
+        assgnExprs.add(new MutableObject<>(paramVarRef));
         AssignOperator assign = new AssignOperator(assgnVars, assgnExprs);
         assign.setSourceLocation(agg.getSourceLocation());
         assign.getInputs().add(agg.getInputs().get(0));
@@ -219,13 +240,14 @@
             List<LogicalVariable> raggVars = new ArrayList<>(1);
             raggVars.add(posVar);
             List<Mutable<ILogicalExpression>> rAggExprs = new ArrayList<>(1);
-            StatefulFunctionCallExpression tidFun = new StatefulFunctionCallExpression(
-                    FunctionUtil.getFunctionInfo(BuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+            StatefulFunctionCallExpression tidFun =
+                    new StatefulFunctionCallExpression(BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.TID),
+                            UnpartitionedPropertyComputer.INSTANCE);
             tidFun.setSourceLocation(agg.getSourceLocation());
-            rAggExprs.add(new MutableObject<ILogicalExpression>(tidFun));
+            rAggExprs.add(new MutableObject<>(tidFun));
             RunningAggregateOperator rAgg = new RunningAggregateOperator(raggVars, rAggExprs);
             rAgg.setSourceLocation(agg.getSourceLocation());
-            rAgg.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+            rAgg.getInputs().add(new MutableObject<>(assign));
             aggregateParentRef.setValue(rAgg);
             context.computeAndSetTypeEnvironmentForOperator(rAgg);
         }
@@ -241,7 +263,7 @@
             return false;
         }
         AggregateOperator agg = (AggregateOperator) op1;
-        if (agg.getVariables().size() > 1 || agg.getVariables().size() <= 0) {
+        if (agg.getVariables().size() != 1) {
             return false;
         }
         LogicalVariable aggVar = agg.getVariables().get(0);
@@ -288,12 +310,16 @@
         if (scanFunc.getArguments().size() != 1) {
             return false;
         }
+        ILogicalOperator unnestInputOp = unnest.getInputs().get(0).getValue();
+        if (!CardinalityInferenceVisitor.isCardinalityExactOne(unnestInputOp)) {
+            return false;
+        }
 
         List<LogicalVariable> assgnVars = new ArrayList<>(1);
         assgnVars.add(aggVar);
         AssignOperator assign = new AssignOperator(assgnVars, scanFunc.getArguments());
         assign.setSourceLocation(agg.getSourceLocation());
-        assign.getInputs().add(unnest.getInputs().get(0));
+        assign.getInputs().add(new MutableObject<>(unnestInputOp));
         context.computeAndSetTypeEnvironmentForOperator(assign);
         opRef.setValue(assign);
         return true;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index e7de31d..81b4a3d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -383,6 +383,11 @@
 
         Mutable<ILogicalOperator> lowestAggregateRefInSubplan =
                 SubplanFlatteningUtil.findLowestAggregate(subplanOp.getNestedPlans().get(0).getRoots().get(0));
+        if (lowestAggregateRefInSubplan == null) {
+            inputOpRef.setValue(inputOpBackup);
+            return new Pair<>(false, new LinkedHashMap<>());
+        }
+
         Mutable<ILogicalOperator> rightInputOpRef = lowestAggregateRefInSubplan.getValue().getInputs().get(0);
         ILogicalOperator rightInputOp = rightInputOpRef.getValue();
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
index 956ba6c..09377fa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -57,13 +57,17 @@
         // For nested subplan, we do not continue for the general inlining.
         if (OperatorManipulationUtil.ancestorOfOperators(subplanOp,
                 ImmutableSet.of(LogicalOperatorTag.NESTEDTUPLESOURCE))) {
-            return new Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>>(
-                    null, null);
+            return new Pair<>(null, null);
         }
-        InlineAllNtsInSubplanVisitor visitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+
+        Mutable<ILogicalOperator> topOpRef = findLowestAggregate(subplanOp.getNestedPlans().get(0).getRoots().get(0));
+        if (topOpRef == null) {
+            return new Pair<>(null, null);
+        }
 
         // Rewrites the query plan.
-        ILogicalOperator topOp = findLowestAggregate(subplanOp.getNestedPlans().get(0).getRoots().get(0)).getValue();
+        InlineAllNtsInSubplanVisitor visitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+        ILogicalOperator topOp = topOpRef.getValue();
         ILogicalOperator opToVisit = topOp.getInputs().get(0).getValue();
         ILogicalOperator result = opToVisit.accept(visitor, null);
         topOp.getInputs().get(0).setValue(result);
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2845.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2845.sqlpp
new file mode 100644
index 0000000..4494cfa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2845.sqlpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-2845
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset jds(jid integer not unknown) open type primary key jid;
+
+create dataset mds(mid integer not unknown) open type primary key mid;
+
+SET `compiler.sort.parallel` "false";
+
+WITH
+j AS (
+  SELECT jid, a
+  FROM jds
+), ---> 3 rows (jid=1, 2, 3)
+
+m1 AS (
+  SELECT jid, x, COUNT(1) c1
+  FROM mds
+  GROUP BY jid, x
+),
+
+m2 AS (
+  SELECT jid, y, COUNT(1) c2
+  FROM mds
+  GROUP BY jid, y
+)
+
+SELECT j.jid AS j_jid, j.a AS j_a,
+  m1.jid AS m1_jid, m1.x AS m1_x, m1.c1 AS m1_c1,
+  m2.jid AS m2_jid, m2.y AS m2_y, m2.c2 AS m2_c2
+FROM j
+LEFT OUTER JOIN m1 ON j.jid=m1.jid
+LEFT OUTER JOIN m2 ON j.jid=m1.jid
+ORDER BY j_jid, m1_x, m2_y, m2_jid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2845.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2845.plan
new file mode 100644
index 0000000..7cbb67f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2845.plan
@@ -0,0 +1,123 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$277(ASC), $#4(ASC), $#5(ASC), $#6(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$277(ASC), $#4(ASC), $#5(ASC), $#6(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- HYBRID_HASH_JOIN [$$295][$$296]  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$295]  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$277][$$jid]  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN (test.jds)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$jid]  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$319, $$320]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- HASH_PARTITION_EXCHANGE [$$319, $$320]  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$273, $$274]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.mds)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$296]  |PARTITIONED|
+                      -- NESTED_LOOP  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- HYBRID_HASH_JOIN [$$303][$$306]  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN (test.jds)  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$306]  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$322, $$323]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- HASH_PARTITION_EXCHANGE [$$322, $$323]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$311, $$312]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.mds)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- SORT_GROUP_BY[$$325, $$326]  |PARTITIONED|
+                                            {
+                                              -- AGGREGATE  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                            }
+                                      -- HASH_PARTITION_EXCHANGE [$$325, $$326]  |PARTITIONED|
+                                        -- SORT_GROUP_BY[$$275, $$276]  |PARTITIONED|
+                                                {
+                                                  -- AGGREGATE  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                }
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN (test.mds)  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.1.ddl.sqlpp
new file mode 100644
index 0000000..180cf55
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.1.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-2845
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset jds(jid integer not unknown) open type primary key jid;
+
+create dataset mds(mid integer not unknown) open type primary key mid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.2.update.sqlpp
new file mode 100644
index 0000000..dd1b82d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.2.update.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into jds ([
+  { "jid":1, "a":100 },
+  { "jid":2, "a":200 },
+  { "jid":3, "a":300 }
+]);
+
+insert into mds ([
+  { "mid":1, "jid":1, "x": 1, "y": 10 },
+  { "mid":2, "jid":1, "x": 1, "y": 20 },
+  { "mid":3, "jid":1, "x": 2, "y": 10 },
+  { "mid":4, "jid":1, "x": 2, "y": 20 },
+  { "mid":5, "jid":2, "x": 1, "y": 10 },
+  { "mid":6, "jid":2, "x": 1, "y": 20 },
+  { "mid":7, "jid":2, "x": 2, "y": 10 },
+  { "mid":8, "jid":2, "x": 2, "y": 20 }
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.query.sqlpp
new file mode 100644
index 0000000..e6db7f2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.query.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.sort.parallel` "false";
+
+WITH
+j AS (
+  SELECT jid, a
+  FROM jds
+), ---> 3 rows (jid=1, 2, 3)
+
+m1 AS (
+  SELECT jid, x, COUNT(1) c1
+  FROM mds
+  GROUP BY jid, x
+), ---> 4 rows ( 2 with jid=1, 2 with jid=2 )
+
+m2 AS (
+  SELECT jid, y, COUNT(1) c2
+  FROM mds
+  GROUP BY jid, y
+)  ---> 4 rows ( 2 with jid=1, 2 with jid=2 )
+
+SELECT j.jid AS j_jid, j.a AS j_a,
+  m1.jid AS m1_jid, m1.x AS m1_x, m1.c1 AS m1_c1,
+  m2.jid AS m2_jid, m2.y AS m2_y, m2.c2 AS m2_c2
+FROM j
+LEFT OUTER JOIN m1 ON j.jid=m1.jid ---> 5 rows (2 with jid=1, 2 with jid=2, 1 with jid=3)
+LEFT OUTER JOIN m2 ON j.jid=m1.jid ---> this predicate is intentional to reproduce the issue
+                                   ---> 17 rows (4 x 4 + 1 with jid=3 because m1.jid is MISSING for it)
+ORDER BY j_jid, m1_x, m2_y, m2_jid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.8.adm
index 06a28e4..bb5ac24 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.8.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.8.adm
@@ -17,25 +17,21 @@
                 subplan {
                           aggregate [$$73] <- [listify($$72)]
                           -- AGGREGATE  |LOCAL|
-                            assign [$$72] <- [object-remove(object-remove(object-remove($$t1, "title"), "authors"), "misc")]
+                            assign [$$72] <- [object-remove(object-remove(object-remove($$t0, "title"), "authors"), "misc")]
                             -- ASSIGN  |LOCAL|
-                              unnest $$t1 <- scan-collection($$64)
+                              unnest $$t0 <- scan-collection(to-array($$paper))
                               -- UNNEST  |LOCAL|
                                 nested tuple source
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                        }
                 -- SUBPLAN  |PARTITIONED|
-                  project ([$$77, $$64])
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    assign [$$64] <- [to-array($$paper)]
-                    -- ASSIGN  |PARTITIONED|
-                      limit 10
-                      -- STREAM_LIMIT  |PARTITIONED|
+                  limit 10
+                  -- STREAM_LIMIT  |PARTITIONED|
+                    exchange
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$77, $$paper] <- test.DBLP1 limit 10
+                      -- DATASOURCE_SCAN  |PARTITIONED|
                         exchange
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          data-scan []<-[$$77, $$paper] <- test.DBLP1 limit 10
-                          -- DATASOURCE_SCAN  |PARTITIONED|
-                            exchange
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              empty-tuple-source
-                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                          empty-tuple-source
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.adm
new file mode 100644
index 0000000..c5939b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2845/query-ASTERIXDB-2845.3.adm
@@ -0,0 +1,17 @@
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 1, "m2_jid": 1, "m2_y": 10 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 1, "m2_jid": 2, "m2_y": 10 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 1, "m2_jid": 1, "m2_y": 20 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 1, "m2_jid": 2, "m2_y": 20 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 2, "m2_jid": 1, "m2_y": 10 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 2, "m2_jid": 2, "m2_y": 10 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 2, "m2_jid": 1, "m2_y": 20 }
+{ "j_jid": 1, "m1_c1": 2, "m2_c2": 2, "j_a": 100, "m1_jid": 1, "m1_x": 2, "m2_jid": 2, "m2_y": 20 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 1, "m2_jid": 1, "m2_y": 10 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 1, "m2_jid": 2, "m2_y": 10 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 1, "m2_jid": 1, "m2_y": 20 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 1, "m2_jid": 2, "m2_y": 20 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 2, "m2_jid": 1, "m2_y": 10 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 2, "m2_jid": 2, "m2_y": 10 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 2, "m2_jid": 1, "m2_y": 20 }
+{ "j_jid": 2, "m1_c1": 2, "m2_c2": 2, "j_a": 200, "m1_jid": 2, "m1_x": 2, "m2_jid": 2, "m2_y": 20 }
+{ "j_jid": 3, "j_a": 300 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c3b15b..851355a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -10425,6 +10425,11 @@
         <output-dir compare="Text">query-ASTERIXDB-2815</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="subquery">
+      <compilation-unit name="query-ASTERIXDB-2845">
+        <output-dir compare="Text">query-ASTERIXDB-2845</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="subset-collection">
     <test-case FilePath="subset-collection">