[ASTERIXDB-3531][COMP] Push projections into RunningAggregator operator
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- At the end of the query optimisation if project is parent of RunningAggregator in the plan tree, project variables are pushed onto RunningAggregator operator.
Ext-ref: MB-57625
Change-Id: Idd194ce11d0117a37fa8930c85195c21534a7392
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19174
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 4a0c85f..506457e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -73,10 +73,6 @@
RunningAggregateOperator ragg = (RunningAggregateOperator) op;
List<LogicalVariable> variables = ragg.getVariables();
List<Mutable<ILogicalExpression>> expressions = ragg.getExpressions();
- int[] outColumns = new int[variables.size()];
- for (int i = 0; i < outColumns.length; i++) {
- outColumns[i] = opSchema.findVariable(variables.get(i));
- }
IRunningAggregateEvaluatorFactory[] runningAggFuns = new IRunningAggregateEvaluatorFactory[expressions.size()];
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
for (int i = 0; i < runningAggFuns.length; i++) {
@@ -85,8 +81,30 @@
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
}
- // TODO push projections into the operator
- int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+ int[] outColumns = new int[variables.size()];
+ int[] projectionList;
+
+ if (ragg.isProjectPushed()) {
+ for (int i = 0; i < outColumns.length; i++) {
+ outColumns[i] = inputSchemas[0].getSize() + i;
+ }
+ List<LogicalVariable> projectVars = ragg.getProjectVariables();
+
+ projectionList = new int[projectVars.size()];
+ int c = 0;
+ for (LogicalVariable projectVar : projectVars) {
+ if (variables.contains(projectVar)) {
+ projectionList[c++] = inputSchemas[0].getSize() + variables.indexOf(projectVar);
+ } else {
+ projectionList[c++] = inputSchemas[0].findVariable(projectVar);
+ }
+ }
+ } else {
+ for (int i = 0; i < outColumns.length; i++) {
+ outColumns[i] = opSchema.findVariable(variables.get(i));
+ }
+ projectionList = JobGenHelper.projectAllVariables(opSchema);
+ }
RunningAggregateRuntimeFactory runtime =
new RunningAggregateRuntimeFactory(projectionList, outColumns, runningAggFuns);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 946a6be..34bf422 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -187,6 +187,9 @@
public Void visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
pprintExprList(op.getExpressions(), indent);
+ if (op.isProjectPushed()) {
+ buffer.append(" project: ").append(str(op.getProjectVariables()));
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index b7bfc63..af3069a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -525,6 +525,9 @@
try {
jsonGenerator.writeStringField(OPERATOR_FIELD, "running-aggregate");
writeVariablesAndExpressions(op.getVariables(), op.getExpressions(), indent);
+ if (op.isProjectPushed()) {
+ writeArrayFieldOfVariables(PROJECT_VARIABLES_FIELD, op.getProjectVariables());
+ }
return null;
} catch (IOException e) {
throw AlgebricksException.create(ErrorCode.ERROR_PRINTING_PLAN, e, String.valueOf(e));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 8000483..98e416a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -112,6 +112,9 @@
stringBuilder.setLength(0);
stringBuilder.append("running-aggregate ").append(str(op.getVariables())).append(" <- ");
printExprList(op.getExpressions());
+ if (op.isProjectPushed()) {
+ stringBuilder.append(" project: ").append(str(op.getProjectVariables()));
+ }
appendSchema(op, showDetails);
appendAnnotations(op, showDetails);
appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
index 23df409..08fa3ea 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
@@ -47,7 +47,8 @@
AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN && op2.getOperatorTag() != LogicalOperatorTag.UNNEST
- && op2.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST) {
+ && op2.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST
+ && op2.getOperatorTag() != LogicalOperatorTag.RUNNINGAGGREGATE) {
return false;
}