checkpoint: added supporting rules and runtimes for running aggregation.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 8ef4847..5dd5d27 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -229,8 +230,6 @@
AssignOperator gbyKeyAssign = new AssignOperator(gbyKeyAssgnVars, gbyKeyAssgnExprs);
gbyKeyAssign.getInputs().add(gby.getInputs().get(0));
-
- context.computeAndSetTypeEnvironmentForOperator(gbyKeyAssign);
// add sort to replace group-by
List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
@@ -241,16 +240,45 @@
OrderOperator order = new OrderOperator(orderExprs);
order.getInputs().add(new MutableObject<ILogicalOperator>(gbyKeyAssign));
- context.computeAndSetTypeEnvironmentForOperator(order);
-
- assign.getInputs().add(new MutableObject<ILogicalOperator>(order));
opRef.setValue(assign);
- } else {
-
- return false;
- }
+ assign.getInputs().add(new MutableObject<ILogicalOperator>(order));
- context.computeAndSetTypeEnvironmentForOperator(assign);
+ context.computeAndSetTypeEnvironmentForOperator(gbyKeyAssign);
+ context.computeAndSetTypeEnvironmentForOperator(order);
+ context.computeAndSetTypeEnvironmentForOperator(assign);
+
+ } else {
+ // if positional variable is used in unnest, the unnest will be pushed into the group-by as a running-aggregate
+
+ // First create assign for the unnest variable
+ List<LogicalVariable> nestedAssignVars = new ArrayList<LogicalVariable>();
+ List<Mutable<ILogicalExpression>> nestedAssignExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ nestedAssignVars.add(unnest1.getVariable());
+ nestedAssignExprs.add(new MutableObject<ILogicalExpression>(arg0));
+ AssignOperator nestedAssign = new AssignOperator(nestedAssignVars, nestedAssignExprs);
+ nestedAssign.getInputs().add(opRef2);
+
+ // Then create running aggregation for the positional variable
+ List<LogicalVariable> raggVars = new ArrayList<LogicalVariable>();
+ List<Mutable<ILogicalExpression>> raggExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ raggVars.add(posVar);
+ StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+ raggExprs.add(new MutableObject<ILogicalExpression>(fce));
+ RunningAggregateOperator raggOp = new RunningAggregateOperator(raggVars, raggExprs);
+ raggOp.setExecutionMode(unnest1.getExecutionMode());
+ RunningAggregatePOperator raggPOp = new RunningAggregatePOperator();
+ raggOp.setPhysicalOperator(raggPOp);
+ raggOp.getInputs().add(nestedPlanRoot.getInputs().get(0));
+ gby.getNestedPlans().get(0).getRoots().set(0, new MutableObject<ILogicalOperator>(raggOp));
+
+ opRef.setValue(nestedAssign);
+
+ context.computeAndSetTypeEnvironmentForOperator(nestedAssign);
+ context.computeAndSetTypeEnvironmentForOperator(raggOp);
+ context.computeAndSetTypeEnvironmentForOperator(gby);
+
+ }
return true;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
index efca336..004e221 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
@@ -20,24 +20,17 @@
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
-import edu.uci.ics.asterix.runtime.evaluators.functions.FieldAccessByIndexDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -50,8 +43,7 @@
}
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
return false;
@@ -64,45 +56,18 @@
ArrayList<LogicalVariable> rOpVars = new ArrayList<LogicalVariable>();
rOpVars.add(p);
ArrayList<Mutable<ILogicalExpression>> rOpExprList = new ArrayList<Mutable<ILogicalExpression>>();
-
- ArrayList<Mutable<ILogicalExpression>> fceArgsList = new ArrayList<>();
-
- // add the argument for the input
- fceArgsList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnest.getVariable())));
-
- // add the reference index, which should be the last column in the tuple produced by an unnest operator
- fceArgsList.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
- new AInt32(unnest.getSchema().size())))));
-
StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
- FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX),
- UnpartitionedPropertyComputer.INSTANCE, fceArgsList);
-
- //StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
- // FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
rOpExprList.add(new MutableObject<ILogicalExpression>(fce));
-
- AssignOperator aOp = new AssignOperator(rOpVars, rOpExprList);
-
- //RunningAggregateOperator rOp = new RunningAggregateOperator(rOpVars, rOpExprList);
- aOp.setExecutionMode(unnest.getExecutionMode());
-
- AssignPOperator aPop = new AssignPOperator();
- aOp.setPhysicalOperator(aPop);
- aOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
-
- opRef.setValue(aOp);
+ RunningAggregateOperator rOp = new RunningAggregateOperator(rOpVars, rOpExprList);
+ rOp.setExecutionMode(unnest.getExecutionMode());
+ RunningAggregatePOperator rPop = new RunningAggregatePOperator();
+ rOp.setPhysicalOperator(rPop);
+ rOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
+ opRef.setValue(rOp);
unnest.setPositionalVariable(null);
- context.computeAndSetTypeEnvironmentForOperator(aOp);
+ context.computeAndSetTypeEnvironmentForOperator(rOp);
context.computeAndSetTypeEnvironmentForOperator(unnest);
-
- // RunningAggregatePOperator rPop = new RunningAggregatePOperator();
- // rOp.setPhysicalOperator(rPop);
- // rOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
- // opRef.setValue(rOp);
- // unnest.setPositionalVariable(null);
- // context.computeAndSetTypeEnvironmentForOperator(rOp);
- // context.computeAndSetTypeEnvironmentForOperator(unnest);
return true;
}
-}
+}
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 500f581..7199f7a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -95,49 +95,67 @@
ILogicalPlan p0 = gby.getNestedPlans().get(0);
if (p0.getRoots().size() == 1) {
Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- boolean serializable = true;
- for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
- AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
- if (!AsterixBuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
- serializable = false;
- break;
- }
- }
-
- if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
- .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
- if (serializable) {
- // if not serializable, use external group-by
- int i = 0;
- for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
- AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef
- .getValue();
- AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
- .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
- expr.getArguments());
- aggOp.getExpressions().get(i).setValue(serialAggExpr);
- i++;
+ if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+ LogicalOperatorTag.AGGREGATE)) {
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ boolean serializable = true;
+ for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+ AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
+ if (!AsterixBuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
+ serializable = false;
+ break;
}
+ }
- ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(gby.getGroupByList(),
- physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
- physicalOptimizationConfig.getExternalGroupByTableSize());
- op.setPhysicalOperator(externalGby);
- generateMergeAggregationExpressions(gby, context);
- } else {
- // if not serializable, use pre-clustered group-by
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- columnList.add(varRef.getVariableReference());
+ if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+ .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
+ if (serializable) {
+ // if not serializable, use external group-by
+ int i = 0;
+ for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+ AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef
+ .getValue();
+ AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+ .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
+ expr.getArguments());
+ aggOp.getExpressions().get(i).setValue(serialAggExpr);
+ i++;
}
+
+ ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
+ gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
+ physicalOptimizationConfig.getExternalGroupByTableSize());
+ op.setPhysicalOperator(externalGby);
+ generateMergeAggregationExpressions(gby, context);
+ } else {
+ // if not serializable, use pre-clustered group-by
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+ List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ columnList.add(varRef.getVariableReference());
+ }
+ }
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
}
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
}
+ } else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+ LogicalOperatorTag.RUNNINGAGGREGATE)) {
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+ List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ columnList.add(varRef.getVariableReference());
+ }
+ }
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
+ } else {
+ throw new AlgebricksException("Unsupported nested operator within a group-by: "
+ + ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name());
}
}
}