[ASTERIXDB-3415][COMP] Nested query with union all failing compilation

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-62951
Change-Id: Ieb9371cb6f85974429b06da44749e1b200ebb189
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18631
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 55165ed..497d21c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -592,7 +592,27 @@
 
     @Override
     public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
-        visitMultiInputOperator(op);
+        orderingExprs.clear();
+        correlatedKeyVars.clear();
+        ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
+        op.getInputs().get(0).setValue(newChild);
+        List<LogicalVariable> leftKeyVars = new ArrayList<>(correlatedKeyVars);
+        correlatedKeyVars.clear();
+
+        newChild = op.getInputs().get(1).getValue().accept(this, null);
+        op.getInputs().get(1).setValue(newChild);
+        int i = 0;
+        for (LogicalVariable leftVar : leftKeyVars) {
+            op.addTriple(new Triple<>(leftKeyVars.get(i),
+                    subplanInputVarToCurrentVarMap.get(currentVarToSubplanInputVarMap.get(leftVar)), context.newVar()));
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            i += 1;
+        }
+        if (correlatedKeyVars.isEmpty()) {
+            correlatedKeyVars.addAll(leftKeyVars);
+        }
+        subtituteVariables(op);
+
         // Update the variable mappings
         List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 04da930..abfc9cc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -357,6 +357,9 @@
             } else if (actualFile.toString().endsWith(".plan")) {
                 runScriptAndCompareWithResultPlan(scriptFile, readerExpected, readerActual);
                 return;
+            } else if (actualFile.toString().endsWith(".jsonl")) {
+                runScriptAndCompareWithResultJsonl(scriptFile, readerExpected, readerActual, statement);
+                return;
             }
 
             String lineExpected, lineActual;
@@ -634,6 +637,49 @@
         }
     }
 
+    private static void runScriptAndCompareWithResultJsonl(File scriptFile, BufferedReader readerExpected,
+            BufferedReader readerActual, String statement) throws ComparisonException, IOException {
+        List<String> expectedLines = readerExpected.lines().collect(Collectors.toList());
+        List<String> actualLines = readerActual.lines().collect(Collectors.toList());
+        boolean compareUnorderedArray = statement != null && getCompareUnorderedArray(statement);
+        boolean ignoreExtraFields = statement != null && getIgnoreExtraFields(statement);
+
+        JsonNode expectedJson, actualJson;
+        int i = 0;
+        for (String expectedLine : expectedLines) {
+            if (actualLines.size() <= i) {
+                throw new ComparisonException("Result for " + canonicalize(scriptFile) + " expected json line at " + i
+                        + " not found: " + truncateIfLong(expectedLine));
+            }
+            String actualLine = actualLines.get(i);
+            i += 1;
+            try {
+                expectedJson = SINGLE_JSON_NODE_READER.readTree(expectedLine);
+            } catch (JsonProcessingException e) {
+                throw new ComparisonException("Invalid expected JSON for: " + scriptFile, e);
+            }
+            try {
+                actualJson = SINGLE_JSON_NODE_READER.readTree(actualLine);
+            } catch (JsonProcessingException e) {
+                throw new ComparisonException("Invalid actual JSON for: " + scriptFile, e);
+            }
+            if (expectedJson == null) {
+                throw new ComparisonException("No expected result for: " + scriptFile);
+            } else if (actualJson == null) {
+                throw new ComparisonException("No actual result for: " + scriptFile);
+            }
+            if (!TestHelper.equalJson(expectedJson, actualJson, compareUnorderedArray, ignoreExtraFields, false,
+                    null)) {
+                throw new ComparisonException("Result for " + scriptFile + " didn't match the expected JSON"
+                        + "\nexpected result:\n" + expectedJson + "\nactual result:\n" + actualJson);
+            }
+        }
+        if (actualLines.size() > i) {
+            throw new ComparisonException("Result for " + canonicalize(scriptFile) + " extra json line at " + i
+                    + " found: " + truncateIfLong(actualLines.get(i)));
+        }
+    }
+
     public void runScriptAndCompareWithResultUnorderedLinesText(File scriptFile, BufferedReader readerExpected,
             BufferedReader readerActual) throws Exception {
         // Using Lists to allow duplicate lines in the result
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.1.ddl.sqlpp
new file mode 100644
index 0000000..774e8a8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.1.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-3415
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type dt1 as {id:int};
+create dataset collection1(dt1) primary key id;
+create dataset collection2(dt1) primary key id;
+create dataset collection3(dt1) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.2.update.sqlpp
new file mode 100644
index 0000000..581408f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.2.update.sqlpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into collection1
+([
+    {
+      "id": 1,
+      "f1": "f1"
+    },
+    {
+        "id": 2,
+        "f1": "f1"
+    },
+    {
+      "id": 3,
+      "f1": "f1"
+    }
+]);
+insert into collection2
+([
+    {
+      "id": 1,
+      "f2": "f2"
+    },
+    {
+        "id": 2,
+        "f2": "f2"
+    },
+    {
+      "id": 3,
+      "f2": "f2"
+    }
+]);
+insert into collection3
+([
+    {
+      "id": 1,
+      "f1": "f1",
+      "items": [2, 4]
+
+    },
+    {
+        "id": 2,
+        "f1": "f1",
+        "items": [1]
+    }
+]);
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.query.sqlpp
new file mode 100644
index 0000000..517000d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-3415
+ */
+-- compareunorderedarray=true
+use test;
+
+SELECT (
+  SELECT c1.f1
+  FROM collection1 AS c1
+  WHERE c1.id IN c3.items
+  UNION ALL
+  SELECT c2.f2
+  FROM collection2 c2 )
+FROM collection3 AS c3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.query.sqlpp
new file mode 100644
index 0000000..9c1fa2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-3415
+ */
+-- compareunorderedarray=true
+use test;
+
+SELECT (
+  SELECT c1.f1
+  FROM collection1 AS c1
+  WHERE c1.id = c3.x
+  UNION ALL
+  SELECT c2.f2
+  FROM collection2 c2 )
+FROM collection3 AS c3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.jsonl b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.jsonl
new file mode 100644
index 0000000..211c0b0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.3.jsonl
@@ -0,0 +1,2 @@
+{ "$1": [ { "f2": "f2" }, { "f2": "f2" }, { "f2": "f2" }, { "f1": "f1" } ] }
+{ "$1": [ { "f2": "f2" }, { "f2": "f2" }, { "f2": "f2" }, { "f1": "f1" } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.jsonl b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.jsonl
new file mode 100644
index 0000000..b28825f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3415/query-ASTERIXDB-3415.4.jsonl
@@ -0,0 +1,2 @@
+{ "$1": [ { "f2": "f2" }, { "f2": "f2" }, { "f2": "f2" } ] }
+{ "$1": [ { "f2": "f2" }, { "f2": "f2" }, { "f2": "f2" } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 515e765..3abfdd6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -7335,6 +7335,11 @@
         <output-dir compare="Text">query-ASTERIXDB-3403</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="query-ASTERIXDB-3415">
+        <output-dir compare="Text">query-ASTERIXDB-3415</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="multipart-dataverse">
     <test-case FilePath="multipart-dataverse">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
index 45c6e2f..4d227ea 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
@@ -41,6 +41,10 @@
         this.varMap = varMap;
     }
 
+    public void addTriple(Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple) {
+        varMap.add(triple);
+    }
+
     public List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> getVariableMappings() {
         return varMap;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index fba095a..14a44ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -36,12 +36,14 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
@@ -147,6 +149,63 @@
         return changed;
     }
 
+    // Check for a special case (ASTERIXDB-3415) to avoid adding Replicate Operator
+    // that causes hang during execution of the plan.
+    private boolean isUnionAllSink(List<Mutable<ILogicalOperator>> group) {
+        if (group.size() != 2) {
+            return false;
+        }
+        List<Pair<Mutable<ILogicalOperator>, Integer>> operators = new ArrayList<>();
+        if (childrenToParents.containsKey(group.get(0))) {
+            for (Mutable<ILogicalOperator> op : childrenToParents.get(group.get(0))) {
+                operators.add(new Pair<>(op, 0));
+            }
+        } else {
+            return false;
+        }
+        List<Pair<Mutable<ILogicalOperator>, Integer>> unionAllOps = new ArrayList<>();
+        while (operators.size() > 0) {
+            Pair<Mutable<ILogicalOperator>, Integer> entry = operators.remove(0);
+            if (entry.first.getValue() instanceof UnionAllOperator) {
+                unionAllOps.add(entry);
+            } else if (entry.first.getValue() instanceof ExchangeOperator && ((ExchangeOperator) entry.first.getValue())
+                    .getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_EXCHANGE) {
+                entry.second += 1;
+            }
+            if (childrenToParents.containsKey(entry.first)) {
+                for (Mutable<ILogicalOperator> op : childrenToParents.get(entry.first)) {
+                    operators.add(new Pair<>(op, entry.second));
+                }
+            }
+        }
+        if (childrenToParents.containsKey(group.get(1))) {
+            for (Mutable<ILogicalOperator> op : childrenToParents.get(group.get(1))) {
+                operators.add(new Pair<>(op, 0));
+            }
+        } else {
+            return false;
+        }
+        while (operators.size() > 0) {
+            Pair<Mutable<ILogicalOperator>, Integer> entry = operators.remove(0);
+            if (entry.first.getValue() instanceof UnionAllOperator) {
+                for (Pair<Mutable<ILogicalOperator>, Integer> unionAllOp : unionAllOps) {
+                    if (unionAllOp.first.equals(entry.first) && unionAllOp.second + entry.second == 1) {
+                        return true;
+                    }
+                }
+            } else if (entry.first.getValue() instanceof ExchangeOperator && ((ExchangeOperator) entry.first.getValue())
+                    .getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_EXCHANGE) {
+                entry.second += 1;
+            }
+            if (childrenToParents.containsKey(entry.first)) {
+                for (Mutable<ILogicalOperator> op : childrenToParents.get(entry.first)) {
+                    operators.add(new Pair<>(op, entry.second));
+                }
+            }
+        }
+        return false;
+    }
+
     private boolean rewriteForOneEquivalentClass(List<Mutable<ILogicalOperator>> members, IOptimizationContext context)
             throws AlgebricksException {
         List<Mutable<ILogicalOperator>> group = new ArrayList<>();
@@ -163,7 +222,7 @@
                 }
             }
             boolean[] materializationFlags = computeMaterilizationFlags(group);
-            if (group.isEmpty()) {
+            if (group.isEmpty() || isUnionAllSink(group)) {
                 continue;
             }
             candidate = group.get(0);