Force global aggregation operator to run on only one partition.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f53f9b..2f7c0ed 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -21,9 +21,11 @@
// TODO type safe list of expressions
private List<Mutable<ILogicalExpression>> mergeExpressions;
private LogicalVariable partitioningVariable;
+ private boolean global;
public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
super(variables, expressions);
+ global = false;
}
@Override
@@ -77,6 +79,14 @@
return partitioningVariable;
}
+ public void setGlobal() {
+ global = true;
+ }
+
+ public boolean isGlobal() {
+ return global;
+ }
+
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 81dc2c2..5d490e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -70,9 +71,14 @@
AggregateOperator aggOp = (AggregateOperator) op;
if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
- partitioningVariables.add(aggOp.getPartitioningVariable());
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+ if (aggOp.isGlobal()) {
+ pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
+ } else {
+ Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
+ partitioningVariables.add(aggOp.getPartitioningVariable());
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null),
+ null);
+ }
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 08271c1..30ab542 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -120,6 +120,7 @@
context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
initAgg.setPartitioningVariable(trueVar);
initAgg.getInputs().get(0).setValue(trueAssignOp);
+ initAgg.setGlobal();
}
return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
} else {