[ASTERIXDB-3531][COMP] Push projections into RunningAggregator operator

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- At the end of the query optimisation if project is parent of RunningAggregator in the plan tree, project variables are pushed onto RunningAggregator operator.

Ext-ref: MB-57625
Change-Id: Idd194ce11d0117a37fa8930c85195c21534a7392
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19174
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 4a0c85f..506457e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -73,10 +73,6 @@
         RunningAggregateOperator ragg = (RunningAggregateOperator) op;
         List<LogicalVariable> variables = ragg.getVariables();
         List<Mutable<ILogicalExpression>> expressions = ragg.getExpressions();
-        int[] outColumns = new int[variables.size()];
-        for (int i = 0; i < outColumns.length; i++) {
-            outColumns[i] = opSchema.findVariable(variables.get(i));
-        }
         IRunningAggregateEvaluatorFactory[] runningAggFuns = new IRunningAggregateEvaluatorFactory[expressions.size()];
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
         for (int i = 0; i < runningAggFuns.length; i++) {
@@ -85,8 +81,30 @@
                     context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
         }
 
-        // TODO push projections into the operator
-        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+        int[] outColumns = new int[variables.size()];
+        int[] projectionList;
+
+        if (ragg.isProjectPushed()) {
+            for (int i = 0; i < outColumns.length; i++) {
+                outColumns[i] = inputSchemas[0].getSize() + i;
+            }
+            List<LogicalVariable> projectVars = ragg.getProjectVariables();
+
+            projectionList = new int[projectVars.size()];
+            int c = 0;
+            for (LogicalVariable projectVar : projectVars) {
+                if (variables.contains(projectVar)) {
+                    projectionList[c++] = inputSchemas[0].getSize() + variables.indexOf(projectVar);
+                } else {
+                    projectionList[c++] = inputSchemas[0].findVariable(projectVar);
+                }
+            }
+        } else {
+            for (int i = 0; i < outColumns.length; i++) {
+                outColumns[i] = opSchema.findVariable(variables.get(i));
+            }
+            projectionList = JobGenHelper.projectAllVariables(opSchema);
+        }
 
         RunningAggregateRuntimeFactory runtime =
                 new RunningAggregateRuntimeFactory(projectionList, outColumns, runningAggFuns);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 946a6be..34bf422 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -187,6 +187,9 @@
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
         pprintExprList(op.getExpressions(), indent);
+        if (op.isProjectPushed()) {
+            buffer.append(" project: ").append(str(op.getProjectVariables()));
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index b7bfc63..af3069a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -525,6 +525,9 @@
         try {
             jsonGenerator.writeStringField(OPERATOR_FIELD, "running-aggregate");
             writeVariablesAndExpressions(op.getVariables(), op.getExpressions(), indent);
+            if (op.isProjectPushed()) {
+                writeArrayFieldOfVariables(PROJECT_VARIABLES_FIELD, op.getProjectVariables());
+            }
             return null;
         } catch (IOException e) {
             throw AlgebricksException.create(ErrorCode.ERROR_PRINTING_PLAN, e, String.valueOf(e));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 8000483..98e416a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -112,6 +112,9 @@
         stringBuilder.setLength(0);
         stringBuilder.append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
         printExprList(op.getExpressions());
+        if (op.isProjectPushed()) {
+            stringBuilder.append(" project: ").append(str(op.getProjectVariables()));
+        }
         appendSchema(op, showDetails);
         appendAnnotations(op, showDetails);
         appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
index 23df409..08fa3ea 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
@@ -47,7 +47,8 @@
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
 
         if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN && op2.getOperatorTag() != LogicalOperatorTag.UNNEST
-                && op2.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST) {
+                && op2.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST
+                && op2.getOperatorTag() != LogicalOperatorTag.RUNNINGAGGREGATE) {
             return false;
         }