[ASTERIXDB-2700][*DB][RT] Fix invalid plan caused by order-by operator in a subquery

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Instead of removing a redundant sort operator from the plan, replace it with
an empty assign operator to avoid potential connection of a sort-merge connector
to another connector.

- modified RemoveUnusedAssignAndAggregateRule to excluded necessary empty
  assign operators from being removed.

Change-Id: I8bc11fa046cb15fab04057086817bd400b7809c0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5103
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp
new file mode 100644
index 0000000..80ada37
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE bigfun IF EXISTS;
+CREATE DATAVERSE bigfun;
+USE bigfun;
+
+CREATE TYPE GleambookUserType AS { gb: int32, id: string };
+CREATE TYPE GleambookMessageType AS { gb: int32, message_id: string };
+
+CREATE DATASET GleambookUsersComposite(GleambookUserType) PRIMARY KEY gb,id;
+CREATE DATASET GleambookMessagesComposite(GleambookMessageType) PRIMARY KEY gb,message_id;
+
+CREATE INDEX usrSinceIx ON GleambookUsersComposite(user_since: string);
+CREATE INDEX authorIdIx ON GleambookMessagesComposite(author_id: string);
+
+SET `compiler.sort.parallel` "false";
+
+FROM (SELECT VALUE u
+      FROM GleambookUsersComposite u
+      WHERE u.user_since >= '2008-07-22T00:00:00'
+      ORDER BY u.id) AS user, GleambookMessagesComposite AS msg
+WHERE msg.author_id /*+ indexnl */ = user.id
+SELECT user.name AS uname, msg.message AS message;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan
new file mode 100644
index 0000000..3cd606b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan
@@ -0,0 +1,35 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$56(ASC), $$57(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- SORT_MERGE_EXCHANGE [$$42(ASC) ]  |PARTITIONED|
+                                    -- STABLE_SORT [$$42(ASC)]  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STABLE_SORT [$$53(ASC), $$54(ASC)]  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp
new file mode 100644
index 0000000..4572164
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-2700
+ */
+
+DROP DATAVERSE bigfun IF EXISTS;
+CREATE DATAVERSE bigfun;
+USE bigfun;
+
+CREATE TYPE GleambookUserType AS { gb: int32, id: string };
+CREATE TYPE GleambookMessageType AS { gb: int32, message_id: string };
+
+CREATE DATASET GleambookUsersComposite(GleambookUserType) PRIMARY KEY gb,id;
+CREATE DATASET GleambookMessagesComposite(GleambookMessageType) PRIMARY KEY gb,message_id;
+
+CREATE INDEX usrSinceIx ON GleambookUsersComposite(user_since: string);
+CREATE INDEX authorIdIx ON GleambookMessagesComposite(author_id: string);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp
new file mode 100644
index 0000000..43bd629
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE bigfun;
+
+INSERT INTO GleambookUsersComposite [
+{"gb": 1, "id": "1", "name": "name1", "user_since": '2010-07-22T00:00:00'},
+{"gb": 1, "id": "2", "name": "name2", "user_since": '2011-07-22T00:00:00'},
+{"gb": 2, "id": "3", "name": "name3", "user_since": '2010-09-22T00:00:00'},
+{"gb": 2, "id": "4", "name": "name4", "user_since": '2010-10-22T00:00:00'},
+{"gb": 3, "id": "5", "name": "name5", "user_since": '2013-07-22T00:00:00'}
+];
+
+INSERT INTO GleambookMessagesComposite [
+{"gb": 1, "message_id": "1", "author_id": "1", "message": "message1_1"},
+{"gb": 1, "message_id": "2", "author_id": "1", "message": "message2_1"},
+{"gb": 1, "message_id": "3", "author_id": "1", "message": "message3_1"},
+{"gb": 2, "message_id": "4", "author_id": "2", "message": "message1_2"},
+{"gb": 2, "message_id": "5", "author_id": "2", "message": "message2_2"},
+{"gb": 3, "message_id": "6", "author_id": "2", "message": "message3_2"},
+{"gb": 3, "message_id": "7", "author_id": "5", "message": "message1_5"},
+{"gb": 3, "message_id": "8", "author_id": "5", "message": "message2_5"}
+];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp
new file mode 100644
index 0000000..9d6dc06
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE bigfun;
+
+SET `compiler.sort.parallel` "false";
+
+FROM (SELECT VALUE u
+      FROM GleambookUsersComposite u
+      WHERE u.user_since >= '2008-07-22T00:00:00'
+      ORDER BY u.id) AS user, GleambookMessagesComposite AS msg
+WHERE msg.author_id /*+ indexnl */ = user.id
+SELECT user.name AS uname, msg.message AS message
+ORDER BY uname, message;;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm
new file mode 100644
index 0000000..56bcafa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm
@@ -0,0 +1,8 @@
+{ "uname": "name1", "message": "message1_1" }
+{ "uname": "name1", "message": "message2_1" }
+{ "uname": "name1", "message": "message3_1" }
+{ "uname": "name2", "message": "message1_2" }
+{ "uname": "name2", "message": "message2_2" }
+{ "uname": "name2", "message": "message3_2" }
+{ "uname": "name5", "message": "message1_5" }
+{ "uname": "name5", "message": "message2_5" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6b5da4d..461e164 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6190,6 +6190,11 @@
         <output-dir compare="Text">insert_nulls_with_secondary_idx</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="query-ASTERIXDB-2700">
+        <output-dir compare="Text">query-ASTERIXDB-2700</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index">
     <test-group name="index/validations">
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 9236545..706028b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
@@ -63,6 +64,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
@@ -284,18 +286,18 @@
                 printOp(op, context);
             }
             changed = true;
-            AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-            if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
-                nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue();
-            }
-            opRef.setValue(nextOp);
-            // Now, transfer annotations from the original sort op. to this one.
-            AbstractLogicalOperator transferTo = nextOp;
-            if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                // remove duplicate exchange operator
-                transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
-            }
-            transferTo.getAnnotations().putAll(op.getAnnotations());
+            // replace the sort with empty assign (to handle cases where the sort might be sitting between exchanges)
+            // RemoveUnusedAssignAndAggregateRule should run after and decide whether to remove the assign or keep it
+            AssignOperator assignOperator = new AssignOperator(new ArrayList<>(0), new ArrayList<>(0));
+            AssignPOperator assignPOperator = new AssignPOperator();
+            assignOperator.setSourceLocation(opRef.getValue().getSourceLocation());
+            assignOperator.setPhysicalOperator(assignPOperator);
+            assignOperator.getInputs().addAll(op.getInputs());
+            opRef.setValue(assignOperator);
+            OperatorManipulationUtil.setOperatorMode(assignOperator);
+            OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(assignOperator, context);
+            context.computeAndSetTypeEnvironmentForOperator(assignOperator);
+            PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(assignOperator, context);
             physOptimizeOp(opRef, required, nestedPlan, context);
         }
         return changed;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index eba3c91..a563f46 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -90,7 +90,7 @@
         // we try to remove these operators if the produced variables from these
         // operators are not used.
         if (!assignedVarMap.isEmpty()) {
-            removeUnusedAssigns(opRef, context);
+            removeUnusedAssigns(opRef, false, null, context);
         }
 
         return isTransformed;
@@ -139,8 +139,19 @@
         return assignVarsSetForThisOp;
     }
 
-    private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
+    /**
+     * Removes the assigned variables of an operator (left-hand side variables) if they are not used. It also removes
+     * the operator altogether when the operator is not assigning any more variables after removal of the variables
+     * (Except for few specific operators which cannot be removed such as UNIONALL).
+     *
+     * @param opRef the operator from which the assigned variables are to be removed.
+     * @param opInSubplan whether the operator is inside a subplan.
+     * @param parentOp the parent operator of {@code opRef} or null if it does not have one.
+     * @param context the optimization context.
+     * @throws AlgebricksException
+     */
+    private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, boolean opInSubplan, ILogicalOperator parentOp,
+            IOptimizationContext context) throws AlgebricksException {
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
 
@@ -148,8 +159,7 @@
 
         while (removeFromAssigns(op, assignVarsSetForThisOp, context) == 0) {
             // UnionAllOperator cannot be removed since it has two branches.
-            if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE
-                    || op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+            if (!canRemoveOperator(op, opInSubplan, parentOp)) {
                 break;
             }
             op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
@@ -161,7 +171,7 @@
         Iterator<Mutable<ILogicalOperator>> childIter = op.getInputs().iterator();
         while (childIter.hasNext()) {
             Mutable<ILogicalOperator> cRef = childIter.next();
-            removeUnusedAssigns(cRef, context);
+            removeUnusedAssigns(cRef, opInSubplan, op, context);
         }
 
         if (op.hasNestedPlans()) {
@@ -170,7 +180,7 @@
             while (planIter.hasNext()) {
                 ILogicalPlan p = planIter.next();
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                    removeUnusedAssigns(r, context);
+                    removeUnusedAssigns(r, true, null, context);
                 }
             }
 
@@ -420,6 +430,22 @@
         }
     }
 
+    private static boolean canRemoveOperator(ILogicalOperator op, boolean opInsideSubplan, ILogicalOperator parentOp) {
+        LogicalOperatorTag opTag = op.getOperatorTag();
+        if (opTag == LogicalOperatorTag.AGGREGATE || opTag == LogicalOperatorTag.UNIONALL) {
+            return false;
+        }
+        if (!opInsideSubplan) {
+            // for an operator in the outer plan, do not remove if it's sitting between exchanges or it's root+exchange
+            boolean childIsExchange =
+                    op.hasInputs() && op.getInputs().get(0).getValue().getOperatorTag() == LogicalOperatorTag.EXCHANGE;
+            if (childIsExchange && (parentOp == null || parentOp.getOperatorTag() == LogicalOperatorTag.EXCHANGE)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void clear() {
         assignedVarMap.clear();
         assignedVarSet.clear();