[ASTERIXDB-2174][COMP] Improve partitioning propagation in GroupBy

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve partitioning property computation in
  AbstractPreclusteredGroupByPOperator
  If "gby v1 as v2" and input partitioned on (v1)
  then output is partitioned on (v2)

Change-Id: Ib679278d6ee6c53b1d9f581438e8bd04c56c2b08
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2176
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/gby_partitioning_property_01.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/gby_partitioning_property_01.sqlpp
new file mode 100644
index 0000000..1777301
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/gby_partitioning_property_01.sqlpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type UserType as {
+  id: string
+};
+
+create type MessageType as {
+  id: string
+};
+
+create dataset GleambookUsers(UserType) primary key id;
+
+create dataset GleambookMessages(MessageType) primary key id;
+
+select meta(user).id as id, count(*) as count
+from `GleambookUsers` user, `GleambookMessages` message
+where meta(user).id = message.author_id
+group by meta(user).id;
+
+drop  dataverse test;
+
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_inline.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_inline.plan
index 4043a9c..991404c 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_inline.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_inline.plan
@@ -17,7 +17,7 @@
                       -- STREAM_PROJECT  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- HYBRID_HASH_JOIN [$$ccustkey, $$cnationkey][$$24, $$26]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$ccustkey]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- PRE_CLUSTERED_GROUP_BY[$$20]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
@@ -34,7 +34,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- HYBRID_HASH_JOIN [$$24][$$22]  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$24]  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- STREAM_SELECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- PRE_CLUSTERED_GROUP_BY[$$25]  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_partitioning_property_01.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_partitioning_property_01.plan
new file mode 100644
index 0000000..7e11745
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/gby_partitioning_property_01.plan
@@ -0,0 +1,36 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$32]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- SORT_GROUP_BY[$$22]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$22][$$25]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index 26d6103..143f01b 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -45,7 +45,7 @@
                                                           -- DATASOURCE_SCAN  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                          -- HASH_PARTITION_EXCHANGE [$$l_orderkey]  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- STREAM_PROJECT  |PARTITIONED|
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
index 2d75ff0..aae39ae 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- SORT_GROUP_BY[$$20]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$20]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$20]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- PRE_CLUSTERED_GROUP_BY[$$16]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
index a81a142..22bc323 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
@@ -32,26 +32,26 @@
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- HYBRID_HASH_JOIN [$$prefixTokenLeft][$$prefixTokenRight]  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- REPLICATE  |PARTITIONED|
-                                                              -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
-                                                                -- UNNEST  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
-                                                                            {
-                                                                              -- AGGREGATE  |LOCAL|
-                                                                                -- STREAM_SELECT  |LOCAL|
-                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                            }
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
-                                                                          -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$prefixTokenLeft]  |PARTITIONED|
+                                                      -- UNNEST  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$95]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STABLE_SORT [$$95(ASC), $$i(ASC)]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
+                                                                                -- REPLICATE  |PARTITIONED|
                                                                                   -- HASH_PARTITION_EXCHANGE [$$tokenUnranked]  |PARTITIONED|
                                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                                       -- UNNEST  |PARTITIONED|
@@ -62,19 +62,24 @@
                                                                                                 -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                  -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
-                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                      -- RUNNING_AGGREGATE  |PARTITIONED|
-                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                          -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
-                                                                                            -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- SORT_MERGE_EXCHANGE [$$99(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
+                                                                                  -- STABLE_SORT [$$99(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- SORT_GROUP_BY[$$128]  |PARTITIONED|
+                                                                                              {
+                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                              }
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            -- ASSIGN  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- SORT_GROUP_BY[$$130]  |PARTITIONED|
-                                                                                                        {
-                                                                                                          -- AGGREGATE  |LOCAL|
-                                                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                                        }
-                                                                                                  -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                     -- SORT_GROUP_BY[$$token]  |PARTITIONED|
                                                                                                             {
                                                                                                               -- AGGREGATE  |LOCAL|
@@ -92,23 +97,23 @@
                                                                                                                         -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- REPLICATE  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
-                                                          -- UNNEST  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
+                                                      -- UNNEST  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
-                                                                      {
-                                                                        -- AGGREGATE  |LOCAL|
-                                                                          -- STREAM_SELECT  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                      }
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
+                                                                          -- REPLICATE  |PARTITIONED|
                                                                             -- HASH_PARTITION_EXCHANGE [$$tokenUnranked]  |PARTITIONED|
                                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                                 -- UNNEST  |PARTITIONED|
@@ -119,19 +124,21 @@
                                                                                           -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                               -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- RUNNING_AGGREGATE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
-                                                                                      -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- SORT_GROUP_BY[$$130]  |PARTITIONED|
-                                                                                                  {
-                                                                                                    -- AGGREGATE  |LOCAL|
-                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                                  }
-                                                                                            -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
+                                                                                  -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- SORT_GROUP_BY[$$130]  |PARTITIONED|
+                                                                                              {
+                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                              }
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                               -- SORT_GROUP_BY[$$token]  |PARTITIONED|
                                                                                                       {
                                                                                                         -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-1572.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-1572.plan
index 6c9c457..6a9d39c 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-1572.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-1572.plan
@@ -17,7 +17,7 @@
                       -- STREAM_PROJECT  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- HYBRID_HASH_JOIN [$$44][$#4]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$44]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- PRE_CLUSTERED_GROUP_BY[$$42]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
@@ -25,53 +25,51 @@
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STABLE_SORT [$$42(ASC)]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$42][$#3]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$42][$#3]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$42]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$40]  |PARTITIONED|
-                                                      {
-                                                        -- AGGREGATE  |LOCAL|
-                                                          -- STREAM_SELECT  |LOCAL|
-                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                      }
+                                          -- PRE_CLUSTERED_GROUP_BY[$$40]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$40(ASC)]  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$40(ASC)]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$40][$#2]  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$40][$#2]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$40]  |PARTITIONED|
-                                                              -- PRE_CLUSTERED_GROUP_BY[$$33]  |PARTITIONED|
-                                                                      {
-                                                                        -- AGGREGATE  |LOCAL|
-                                                                          -- STREAM_SELECT  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                      }
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$33]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$33][$#1]  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$33][$#1]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$#1]  |PARTITIONED|
-                                                                          -- ASSIGN  |UNPARTITIONED|
-                                                                            -- UNNEST  |UNPARTITIONED|
-                                                                              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$#2]  |PARTITIONED|
-                                                              -- ASSIGN  |UNPARTITIONED|
-                                                                -- UNNEST  |UNPARTITIONED|
-                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$#3]  |PARTITIONED|
-                                              -- ASSIGN  |UNPARTITIONED|
-                                                -- UNNEST  |UNPARTITIONED|
-                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$#1]  |PARTITIONED|
+                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                        -- UNNEST  |UNPARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$#2]  |PARTITIONED|
+                                                          -- ASSIGN  |UNPARTITIONED|
+                                                            -- UNNEST  |UNPARTITIONED|
+                                                              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$#3]  |PARTITIONED|
+                                          -- ASSIGN  |UNPARTITIONED|
+                                            -- UNNEST  |UNPARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
                             -- HASH_PARTITION_EXCHANGE [$#4]  |PARTITIONED|
                               -- ASSIGN  |UNPARTITIONED|
                                 -- UNNEST  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
index 5ac6b62..babbeeb 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
@@ -18,186 +18,225 @@
                                         -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- STABLE_SORT [$$95(ASC)]  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$95][$$96]  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- HYBRID_HASH_JOIN [$$95][$$96]  |PARTITIONED|
-                                  -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
-                                    -- PRE_CLUSTERED_GROUP_BY[$$83]  |PARTITIONED|
-                                            {
-                                              -- AGGREGATE  |LOCAL|
+                                -- PRE_CLUSTERED_GROUP_BY[$$83]  |PARTITIONED|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- AGGREGATE  |LOCAL|
+                                              -- ASSIGN  |LOCAL|
                                                 -- AGGREGATE  |LOCAL|
-                                                  -- ASSIGN  |LOCAL|
-                                                    -- AGGREGATE  |LOCAL|
-                                                      -- STREAM_SELECT  |LOCAL|
-                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                            }
+                                                  -- STREAM_SELECT  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STABLE_SORT [$$83(ASC)]  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STABLE_SORT [$$83(ASC)]  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$83][$$84]  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- HYBRID_HASH_JOIN [$$83][$$84]  |PARTITIONED|
-                                                  -- HASH_PARTITION_EXCHANGE [$$83]  |PARTITIONED|
-                                                    -- PRE_CLUSTERED_GROUP_BY[$$63]  |PARTITIONED|
-                                                            {
-                                                              -- AGGREGATE  |LOCAL|
-                                                                -- AGGREGATE  |LOCAL|
-                                                                  -- STREAM_SELECT  |LOCAL|
-                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                            }
+                                                -- PRE_CLUSTERED_GROUP_BY[$$63]  |PARTITIONED|
+                                                        {
+                                                          -- AGGREGATE  |LOCAL|
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- STREAM_SELECT  |LOCAL|
+                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                        }
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$63][$$73]  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- HYBRID_HASH_JOIN [$$63][$$73]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ASSIGN  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                       -- REPLICATE  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- REPLICATE  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- BTREE_SEARCH  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- ASSIGN  |PARTITIONED|
-                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- REPLICATE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                               -- ASSIGN  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- REPLICATE  |PARTITIONED|
                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$84]  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$77][$$76]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$77]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- PRE_CLUSTERED_GROUP_BY[$$85]  |PARTITIONED|
+                                                                            {
+                                                                              -- AGGREGATE  |LOCAL|
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- STREAM_SELECT  |LOCAL|
+                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                            }
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- HYBRID_HASH_JOIN [$$85][$$87]  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- BTREE_SEARCH  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                  -- HASH_PARTITION_EXCHANGE [$$84]  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$77][$$76]  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$76]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ASSIGN  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                     -- REPLICATE  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$114]  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- STREAM_SELECT  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
-                                                                                          {
-                                                                                            -- AGGREGATE  |LOCAL|
-                                                                                              -- AGGREGATE  |LOCAL|
-                                                                                                -- STREAM_SELECT  |LOCAL|
-                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                          }
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- HYBRID_HASH_JOIN [$$119][$$120]  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- REPLICATE  |PARTITIONED|
-                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$76]  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$96]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$79][$$78]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                                          -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- AGGREGATE  |LOCAL|
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- STREAM_SELECT  |LOCAL|
+                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$97(ASC)]  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$97][$$100]  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- PRE_CLUSTERED_GROUP_BY[$$101]  |PARTITIONED|
+                                                                            {
+                                                                              -- AGGREGATE  |LOCAL|
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- STREAM_SELECT  |LOCAL|
+                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                            }
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- REPLICATE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- HYBRID_HASH_JOIN [$$101][$$102]  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                  -- HASH_PARTITION_EXCHANGE [$$96]  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$79][$$78]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
-                                                      {
-                                                        -- AGGREGATE  |LOCAL|
-                                                          -- AGGREGATE  |LOCAL|
-                                                            -- AGGREGATE  |LOCAL|
-                                                              -- STREAM_SELECT  |LOCAL|
-                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                      }
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$97(ASC)]  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$97][$$100]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- STREAM_SELECT  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- PRE_CLUSTERED_GROUP_BY[$$101]  |PARTITIONED|
-                                                                                {
-                                                                                  -- AGGREGATE  |LOCAL|
-                                                                                    -- AGGREGATE  |LOCAL|
-                                                                                      -- STREAM_SELECT  |LOCAL|
-                                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                }
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- HYBRID_HASH_JOIN [$$101][$$102]  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- REPLICATE  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                                       -- ASSIGN  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                           -- REPLICATE  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$100]  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- HYBRID_HASH_JOIN [$$114][$$113]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$114]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
+                                                                                      {
+                                                                                        -- AGGREGATE  |LOCAL|
+                                                                                          -- AGGREGATE  |LOCAL|
+                                                                                            -- STREAM_SELECT  |LOCAL|
+                                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                      }
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- HYBRID_HASH_JOIN [$$119][$$120]  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
                                                                                             -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
                                                                                               -- ASSIGN  |PARTITIONED|
                                                                                                 -- STREAM_PROJECT  |PARTITIONED|
@@ -210,72 +249,27 @@
                                                                                                               -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$100]  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- HYBRID_HASH_JOIN [$$114][$$113]  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- REPLICATE  |PARTITIONED|
-                                                                          -- HASH_PARTITION_EXCHANGE [$$114]  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- STREAM_SELECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
-                                                                                              {
-                                                                                                -- AGGREGATE  |LOCAL|
-                                                                                                  -- AGGREGATE  |LOCAL|
-                                                                                                    -- STREAM_SELECT  |LOCAL|
-                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                              }
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- HYBRID_HASH_JOIN [$$119][$$120]  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- REPLICATE  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- BTREE_SEARCH  |PARTITIONED|
-                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- REPLICATE  |PARTITIONED|
-                                                                                                    -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$113]  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$113]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- REPLICATE  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- REPLICATE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$78]  |PARTITIONED|
+                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$78]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- REPLICATE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
index 14b31eb..f914055 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
@@ -26,155 +26,146 @@
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- STABLE_SORT [$$92(ASC)]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$92][$$93]  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$92][$$93]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$92]  |PARTITIONED|
-                                      -- PRE_CLUSTERED_GROUP_BY[$$32]  |PARTITIONED|
-                                              {
-                                                -- AGGREGATE  |LOCAL|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$32]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- AGGREGATE  |LOCAL|
+                                                -- ASSIGN  |LOCAL|
                                                   -- AGGREGATE  |LOCAL|
-                                                    -- ASSIGN  |LOCAL|
-                                                      -- AGGREGATE  |LOCAL|
-                                                        -- STREAM_SELECT  |LOCAL|
-                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                              }
+                                                    -- STREAM_SELECT  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$32(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STABLE_SORT [$$32(ASC)]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- HYBRID_HASH_JOIN [$$32][$$85]  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- REPLICATE  |PARTITIONED|
-                                                              -- HASH_PARTITION_EXCHANGE [$$94]  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |UNPARTITIONED|
-                                                                  -- ASSIGN  |UNPARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                      -- REPLICATE  |UNPARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                          -- AGGREGATE  |UNPARTITIONED|
-                                                                            -- AGGREGATE  |UNPARTITIONED|
-                                                                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                                -- AGGREGATE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
-                                                      -- ASSIGN  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- REPLICATE  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$32][$$85]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |UNPARTITIONED|
+                                                    -- ASSIGN  |UNPARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                        -- REPLICATE  |UNPARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                            -- AGGREGATE  |UNPARTITIONED|
+                                                              -- AGGREGATE  |UNPARTITIONED|
+                                                                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                  -- AGGREGATE  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                               -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_SELECT  |UNPARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                      -- REPLICATE  |UNPARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                          -- AGGREGATE  |UNPARTITIONED|
-                                                                                            -- AGGREGATE  |UNPARTITIONED|
-                                                                                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                                                -- AGGREGATE  |PARTITIONED|
-                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$93]  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
-                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                            -- ASSIGN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- PRE_CLUSTERED_GROUP_BY[$$94]  |PARTITIONED|
-                                                                        {
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- STREAM_SELECT  |UNPARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                  -- REPLICATE  |UNPARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                      -- AGGREGATE  |UNPARTITIONED|
+                                                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                            -- AGGREGATE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$93]  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- STREAM_SELECT  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- PRE_CLUSTERED_GROUP_BY[$$94]  |PARTITIONED|
+                                                                    {
+                                                                      -- AGGREGATE  |LOCAL|
+                                                                        -- AGGREGATE  |LOCAL|
                                                                           -- AGGREGATE  |LOCAL|
-                                                                            -- AGGREGATE  |LOCAL|
-                                                                              -- AGGREGATE  |LOCAL|
-                                                                                -- STREAM_SELECT  |LOCAL|
-                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                        }
+                                                                            -- STREAM_SELECT  |LOCAL|
+                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                    }
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STABLE_SORT [$$94(ASC)]  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- STABLE_SORT [$$94(ASC)]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- HYBRID_HASH_JOIN [$$94][$$95]  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- REPLICATE  |PARTITIONED|
-                                                                                  -- HASH_PARTITION_EXCHANGE [$$94]  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |UNPARTITIONED|
-                                                                                      -- ASSIGN  |UNPARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                          -- REPLICATE  |UNPARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                              -- AGGREGATE  |UNPARTITIONED|
-                                                                                                -- AGGREGATE  |UNPARTITIONED|
-                                                                                                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- AGGREGATE  |PARTITIONED|
-                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                        -- STREAM_SELECT  |PARTITIONED|
-                                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                              -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
-                                                                                -- ASSIGN  |PARTITIONED|
+                                                                        -- HYBRID_HASH_JOIN [$$94][$$95]  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$94]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |UNPARTITIONED|
+                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                  -- REPLICATE  |UNPARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                      -- AGGREGATE  |UNPARTITIONED|
+                                                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                            -- AGGREGATE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- STREAM_SELECT  |PARTITIONED|
                                                                                   -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- REPLICATE  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                                        -- STREAM_SELECT  |UNPARTITIONED|
+                                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                    -- STREAM_SELECT  |UNPARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                                        -- REPLICATE  |UNPARTITIONED|
                                                                                                           -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                                            -- REPLICATE  |UNPARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                                                -- AGGREGATE  |UNPARTITIONED|
-                                                                                                                  -- AGGREGATE  |UNPARTITIONED|
-                                                                                                                    -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                                                                      -- AGGREGATE  |PARTITIONED|
-                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                                                                                            -- ASSIGN  |PARTITIONED|
-                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            -- AGGREGATE  |UNPARTITIONED|
+                                                                                                              -- AGGREGATE  |UNPARTITIONED|
+                                                                                                                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- AGGREGATE  |PARTITIONED|
+                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index ab68b68..75970ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -86,6 +87,10 @@
         ILogicalOperator op2 = gby.getInputs().get(0).getValue();
         IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
         IPartitioningProperty pp = childProp.getPartitioningProperty();
+        Map<LogicalVariable, LogicalVariable> ppSubstMap = computePartitioningPropertySubstitutionMap(gby, pp);
+        if (ppSubstMap != null) {
+            pp.substituteColumnVars(ppSubstMap);
+        }
         List<ILocalStructuralProperty> childLocals = childProp.getLocalProperties();
         if (childLocals == null) {
             deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -100,6 +105,38 @@
         deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
     }
 
+    // If we have "gby var1 as var3, var2 as var4"
+    // and input is partitioned on (var1,var2) then output is partitioned on (var3,var4)
+    private Map<LogicalVariable, LogicalVariable> computePartitioningPropertySubstitutionMap(GroupByOperator gbyOp,
+            IPartitioningProperty childpp) {
+        Set<LogicalVariable> childPartitioningColumns = new HashSet<>();
+        childpp.getColumns(childPartitioningColumns);
+
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = gbyOp.getGroupByList();
+        if (groupByList.size() != childPartitioningColumns.size()) {
+            return null;
+        }
+
+        Map<LogicalVariable, LogicalVariable> substMap = null;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : groupByList) {
+            ILogicalExpression expr = ve.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                return null;
+            }
+            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;
+            LogicalVariable var = varRefExpr.getVariableReference();
+            if (!childPartitioningColumns.remove(var)) {
+                return null;
+            }
+            if (substMap == null) {
+                substMap = new HashMap<>();
+            }
+            substMap.put(var, ve.first);
+        }
+
+        return childPartitioningColumns.isEmpty() ? substMap : null;
+    }
+
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {