ASTERIXDB-1343: support queries over nodegroup-based datasets.

Fixed operators to have the right NodeDomain.
Added several regerssion tests.

Change-Id: I776b80e5b824c83bcdf43c95fff99bb151506f84
Reviewed-on: https://asterix-gerrit.ics.uci.edu/719
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql
new file mode 100644
index 0000000..ae8f860
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test on
+    asterix_nc2,
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql
new file mode 100644
index 0000000..78e93d8
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test on
+    asterix_nc1,
+    asterix_nc2;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql
new file mode 100644
index 0000000..05b1c36
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+create index lineitem_shipdateIx on LineItem (l_shipdate);
+create index lineitem_receiptdateIx on LineItem (l_receiptdate);
+create index lineitem_fk_orders on LineItem (l_orderkey);
+create index lineitem_fk_part on LineItem (l_partkey);
+create index lineitem_fk_supplier on LineItem (l_suppkey);
+create index orders_fk_customer on Orders (o_custkey);
+create index orders_orderdateIx on Orders (o_orderdate);
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql
new file mode 100644
index 0000000..f94f8b0
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
new file mode 100644
index 0000000..1dac8b2
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
new file mode 100644
index 0000000..b084028
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
new file mode 100644
index 0000000..4a3162f
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
@@ -0,0 +1,46 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STABLE_SORT [$$43(ASC)]  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan
new file mode 100644
index 0000000..1dac8b2
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|