ASTERIXDB-1602: fix the reset of lastUsedClusterId in ExtractCommonOperatorsRule.

Change-Id: Ic1a9a638906a3f973b3b932489c3282132e10c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1154
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index c247b49..e291dc1 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -202,6 +202,9 @@
         condPushDownAndJoinInference.add(new DisjunctivePredicateToJoinRule());
         condPushDownAndJoinInference.add(new PushSelectIntoJoinRule());
         condPushDownAndJoinInference.add(new IntroJoinInsideSubplanRule());
+        // Apply RemoveCartesianProductWithEmptyBranchRule before PushMapOperatorDownThroughProductRule
+        // to avoid that a constant assignment gets pushed into an empty branch.
+        condPushDownAndJoinInference.add(new RemoveCartesianProductWithEmptyBranchRule());
         condPushDownAndJoinInference.add(new PushMapOperatorDownThroughProductRule());
         condPushDownAndJoinInference.add(new PushSubplanWithAggregateDownThroughProductRule());
         condPushDownAndJoinInference.add(new SubplanOutOfGroupRule());
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
index 366d67c..9087eeb 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$7(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$83]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$82]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$83(ASC)] HASH:[$$83]  |PARTITIONED|
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$82(ASC)] HASH:[$$82]  |PARTITIONED|
               -- SORT_GROUP_BY[$$58]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,20 +21,20 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- PRE_CLUSTERED_GROUP_BY[$$80]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$79]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$80(ASC)] HASH:[$$80]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$76]  |PARTITIONED|
+                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$79(ASC)] HASH:[$$79]  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$75]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- STREAM_SELECT  |LOCAL|
                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$76(ASC)]  |PARTITIONED|
+                                      -- STABLE_SORT [$$75(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.ddl.sqlpp
new file mode 100644
index 0000000..94814ce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.ddl.sqlpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpcds if exists;
+create dataverse tpcds;
+
+use tpcds;
+
+create type tpcds.date_dim_type as closed {
+    d_date_sk:                 int64,
+    d_date_id:                 string,
+    d_date:                    string? ,
+    d_month_seq:               int64?,
+    d_week_seq:                int64?,
+    d_quarter_seq:             int64?,
+    d_year:                    int64?,
+    d_dow:                     int64?,
+    d_moy:                     int64?,
+    d_dom:                     int64?,
+    d_qoy:                     int64?,
+    d_fy_year:                 int64?,
+    d_fy_quarter_seq:          int64?,
+    d_fy_week_seq:             int64?,
+    d_day_name:                string?,
+    d_quarter_name:            string?,
+    d_holiday:                 string?,
+    d_weekend:                 string?,
+    d_following_holiday:       string?,
+    d_first_dom:               int64?,
+    d_last_dom:                int64?,
+    d_same_day_ly:             int64?,
+    d_same_day_lq:             int64?,
+    d_current_day:             string?,
+    d_current_week:            string?,
+    d_current_month:           string?,
+    d_current_quarter:         string?,
+    d_current_year:            string?
+}
+
+create type tpcds.item_type as closed {
+    i_item_sk:                 int64,
+    i_item_id:                 string,
+    i_rec_start_date:          string?,
+    i_rec_end_date:            string?,
+    i_item_desc:               string?,
+    i_current_price:           double?,
+    i_wholesale_cost:          double?,
+    i_brand_id:                int64? ,
+    i_brand:                   string?,
+    i_class_id:                int64? ,
+    i_class:                   string?,
+    i_category_id:             int64? ,
+    i_category:                string?,
+    i_manufact_id:             int64? ,
+    i_manufact:                string?,
+    i_size:                    string?,
+    i_formulation:             string?,
+    i_color:                   string?,
+    i_units:                   string?,
+    i_container:               string?,
+    i_manager_id:              int64?,
+    i_product_name:            string?
+}
+
+create dataset item (item_type)
+primary key i_item_sk;
+
+create dataset date_dim(date_dim_type)
+primary key d_date_sk;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.2.update.sqlpp
new file mode 100644
index 0000000..d0f16e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpcds;
+
+load  dataset item using localfs ((`path`=`asterix_nc1://data/tpcds/item.csv`),
+(`format`=`delimited-text`), (`delimiter`=`|`));
+
+load  dataset date_dim using localfs ((`path`=`asterix_nc1://data/tpcds/date_dim.csv`),
+(`format`=`delimited-text`), (`delimiter`=`|`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.3.query.sqlpp
new file mode 100644
index 0000000..95889bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.3.query.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpcds;
+
+with tab as
+(
+ select *
+ from item,
+      date_dim d1
+      where d1.d_date in
+        (select value d2.d_date
+         from date_dim d2
+         where d2.d_week_seq in
+            (select value d3.d_week_seq
+             from date_dim d3
+             where d3.d_date in ['1900-01-02', '1900-01-12', '1900-01-15']
+            )
+        )
+ group by i_item_id
+)
+
+select coll_count((
+ select *
+ from tab t1, tab t2
+));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.adm
new file mode 100644
index 0000000..2dc6b04
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1602/query-ASTERIXDB-1602.1.adm
@@ -0,0 +1 @@
+{ "$1": 121 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index be3ca8d..7027092 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5935,6 +5935,11 @@
         <output-dir compare="Text">query-ASTERIXDB-1596</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="tpcds">
+      <compilation-unit name="query-ASTERIXDB-1602">
+        <output-dir compare="Text">query-ASTERIXDB-1602</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="tpch">
     <test-case FilePath="tpch">
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 40fce90..474cc73 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -104,7 +104,7 @@
                 opToCandidateInputs.clear();
                 clusterMap.clear();
                 clusterWaitForMap.clear();
-                lastUsedClusterId = 0;
+                lastUsedClusterId = 0; // Resets lastUsedClusterId to 0.
             } while (changed);
             roots.clear();
         }
@@ -404,7 +404,6 @@
     }
 
     private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {
-        lastUsedClusterId = 0;
         for (Mutable<ILogicalOperator> root : roots) {
             computeClusters(null, root, new MutableInt(++lastUsedClusterId));
         }
@@ -502,6 +501,7 @@
                             bs.set(prevClusterId.getValue());
                         }
                     }
+                    clusterWaitForMap.remove(currentClusterId.getValue());
                     currentClusterId.setValue(prevClusterId.getValue());
                 }
             }