[NO ISSUE][COMP] Eliminate unused running aggregates

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve RemoveUnusedAssignAndAggregateRule
  to eliminate unused running aggregates

Change-Id: Iaf002dcb2aa56946df97fb08738d4c03ed308d7c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10804
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan
index 646ab5b..6d569dc 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan
@@ -10,87 +10,54 @@
                   -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- STREAM_PROJECT  |PARTITIONED|
-                        -- RUNNING_AGGREGATE  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- RUNNING_AGGREGATE  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
-                                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$taskId(ASC)] HASH:[$$taskId]  |PARTITIONED|
-                                              -- SORT_GROUP_BY[$$279]  |PARTITIONED|
-                                                      {
-                                                        -- AGGREGATE  |LOCAL|
-                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                      }
-                                                -- HASH_PARTITION_EXCHANGE [$$279]  |PARTITIONED|
-                                                  -- SORT_GROUP_BY[$$242]  |PARTITIONED|
-                                                          {
-                                                            -- AGGREGATE  |LOCAL|
-                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                          }
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- REPLICATE  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
+                                    -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$taskId(ASC)] HASH:[$$taskId]  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$279]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- HASH_PARTITION_EXCHANGE [$$279]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$242]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$281]  |PARTITIONED|
-                                                            {
-                                                              -- AGGREGATE  |LOCAL|
-                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                            }
-                                                      -- HASH_PARTITION_EXCHANGE [$$281]  |PARTITIONED|
-                                                        -- SORT_GROUP_BY[$$243]  |PARTITIONED|
-                                                                {
-                                                                  -- AGGREGATE  |LOCAL|
-                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                }
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- REPLICATE  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- SORT_GROUP_BY[$$283]  |PARTITIONED|
-                                                {
-                                                  -- AGGREGATE  |LOCAL|
-                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                }
-                                          -- HASH_PARTITION_EXCHANGE [$$283]  |PARTITIONED|
-                                            -- SORT_GROUP_BY[$$244]  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- SORT_GROUP_BY[$$281]  |PARTITIONED|
                                                     {
                                                       -- AGGREGATE  |LOCAL|
                                                         -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                     }
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                  -- STREAM_SELECT  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$281]  |PARTITIONED|
+                                                -- SORT_GROUP_BY[$$243]  |PARTITIONED|
+                                                        {
+                                                          -- AGGREGATE  |LOCAL|
+                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                        }
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                     -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                           -- REPLICATE  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -100,6 +67,35 @@
                                                                     -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- SORT_GROUP_BY[$$283]  |PARTITIONED|
+                                            {
+                                              -- AGGREGATE  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                            }
+                                      -- HASH_PARTITION_EXCHANGE [$$283]  |PARTITIONED|
+                                        -- SORT_GROUP_BY[$$244]  |PARTITIONED|
+                                                {
+                                                  -- AGGREGATE  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                }
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- STREAM_PROJECT  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query_issue849.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query_issue849.plan
index fd70464..5d16539 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query_issue849.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query_issue849.plan
@@ -22,11 +22,9 @@
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- HYBRID_HASH_JOIN [$$45][$$44]  |PARTITIONED|
               -- HASH_PARTITION_EXCHANGE [$$45]  |PARTITIONED|
-                -- STREAM_PROJECT  |UNPARTITIONED|
-                  -- RUNNING_AGGREGATE  |UNPARTITIONED|
-                    -- ASSIGN  |UNPARTITIONED|
-                      -- UNNEST  |UNPARTITIONED|
-                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                -- ASSIGN  |UNPARTITIONED|
+                  -- UNNEST  |UNPARTITIONED|
+                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index a563f46..956f0b7 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
@@ -50,7 +51,7 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * Removes unused variables from Assign, Unnest, Aggregate, UnionAll, and Group-by operators.
+ * Removes unused variables from Assign, Unnest, Aggregate, RunningAggregate, UnionAll, and Group-by operators.
  */
 public class RemoveUnusedAssignAndAggregateRule implements IAlgebraicRewriteRule {
 
@@ -223,6 +224,13 @@
                     isTransformed = true;
                 }
                 return agg.getVariables().size();
+            case RUNNINGAGGREGATE:
+                RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+                if (removeUnusedVarsAndExprs(toRemove, ragg.getVariables(), ragg.getExpressions())) {
+                    context.computeAndSetTypeEnvironmentForOperator(ragg);
+                    isTransformed = true;
+                }
+                return ragg.getVariables().size();
             case UNNEST:
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();
@@ -354,6 +362,11 @@
                 assignVarsSetInThisOp.addAll(agg.getVariables());
                 targetOpFound = true;
                 break;
+            case RUNNINGAGGREGATE:
+                RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+                assignVarsSetInThisOp.addAll(ragg.getVariables());
+                targetOpFound = true;
+                break;
             case UNNEST:
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();