merge fullstack_pregelix_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f7c0ed..0c105d0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,12 +20,11 @@
// private ArrayList<AggregateFunctionCallExpression> expressions;
// TODO type safe list of expressions
private List<Mutable<ILogicalExpression>> mergeExpressions;
- private LogicalVariable partitioningVariable;
private boolean global;
public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
super(variables, expressions);
- global = false;
+ global = true;
}
@Override
@@ -71,16 +70,8 @@
return mergeExpressions;
}
- public void setPartitioningVariable(LogicalVariable partitioningVariable) {
- this.partitioningVariable = partitioningVariable;
- }
-
- public LogicalVariable getPartitioningVariable() {
- return partitioningVariable;
- }
-
- public void setGlobal() {
- global = true;
+ public void setGlobal(boolean global) {
+ this.global = global;
}
public boolean isGlobal() {
@@ -100,4 +91,5 @@
}
return env;
}
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index a620e54..42f7e20 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -77,9 +77,6 @@
for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
- if (op.getPartitioningVariable() != null) {
- usedVariables.add(op.getPartitioningVariable());
- }
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 5d490e9..f47c2ec 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,12 +16,10 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +29,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -40,7 +37,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -59,30 +55,29 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- IPhysicalPropertiesVector childProps = op2.getDeliveredPhysicalProperties();
- deliveredProperties = new StructuralPropertiesVector(childProps.getPartitioningProperty(),
- new ArrayList<ILocalStructuralProperty>(0));
+ AggregateOperator aggOp = (AggregateOperator) op;
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ if (aggOp.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ deliveredProperties = new StructuralPropertiesVector(op2.getDeliveredPhysicalProperties()
+ .getPartitioningProperty(), new ArrayList<ILocalStructuralProperty>());
+ } else {
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+ new ArrayList<ILocalStructuralProperty>());
+ }
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
AggregateOperator aggOp = (AggregateOperator) op;
- if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
- StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- if (aggOp.isGlobal()) {
- pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
- } else {
- Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
- partitioningVariables.add(aggOp.getPartitioningVariable());
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null),
- null);
- }
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
}
+
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 11e24d7..531b300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -37,10 +38,8 @@
public class StreamLimitPOperator extends AbstractPhysicalOperator {
- private boolean global;
+ public StreamLimitPOperator() {
- public StreamLimitPOperator(boolean global) {
- this.global = global;
}
@Override
@@ -55,14 +54,22 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ //partitioning property: unpartitioned; local property: whatever from the child
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, op2
+ .getDeliveredPhysicalProperties().getLocalProperties());
+ } else {
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- if (global) {
+ AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+ if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -83,7 +90,8 @@
ILogicalExpression offsetExpr = limit.getOffset().getValue();
IScalarEvaluatorFactory offsetFact = (offsetExpr == null) ? null : expressionRuntimeProvider
.createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
context.getBinaryIntegerInspectorFactory());
builder.contributeMicroOperator(limit, runtime, recDesc);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..35b36dc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -96,14 +97,21 @@
break;
}
default: {
+ boolean forceUnpartitioned = false;
if (op.getOperatorTag() == LogicalOperatorTag.LIMIT) {
LimitOperator opLim = (LimitOperator) op;
if (opLim.isTopmostLimitOp()) {
- if (opLim.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
- opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
- change = true;
- }
- break;
+ opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ change = true;
+ forceUnpartitioned = true;
+ }
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ AggregateOperator aggOp = (AggregateOperator) op;
+ if (aggOp.isGlobal()) {
+ op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ change = true;
+ forceUnpartitioned = true;
}
}
@@ -112,6 +120,8 @@
AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
switch (inputOp.getExecutionMode()) {
case PARTITIONED: {
+ if (forceUnpartitioned)
+ break;
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
change = true;
exit = true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 30ab542..013ddda 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -16,12 +16,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -110,17 +108,10 @@
} else {
// The local aggregate operator is fed by the input of the original aggregate operator.
pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
- // Set the partitioning variable in the local agg to ensure it is not projected away.
- context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
- LogicalVariable trueVar = context.newVar();
// Reintroduce assign op for the global agg partitioning var.
- AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
- ConstantExpression.TRUE));
- trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
- context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
- initAgg.setPartitioningVariable(trueVar);
- initAgg.getInputs().get(0).setValue(trueAssignOp);
- initAgg.setGlobal();
+ initAgg.getInputs().get(0).setValue(pushedAgg);
+ pushedAgg.setGlobal(false);
+ context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
}
return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
} else {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 3260ca0..7bc150a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -51,6 +51,7 @@
orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+ orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
}
@Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
index c3d935c..c75db57 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -29,7 +29,7 @@
return false;
}
AggregateOperator aggOp = (AggregateOperator) op;
- if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+ if (!aggOp.isGlobal() || aggOp.getExecutionMode() == ExecutionMode.LOCAL) {
return false;
}
Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
index acfdbd3..e5f60bc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
@@ -106,7 +106,7 @@
opLim.getMaxObjects(), opLim.getOffset());
clone2 = new LimitOperator(maxPlusOffset, false);
}
- clone2.setPhysicalOperator(new StreamLimitPOperator(false));
+ clone2.setPhysicalOperator(new StreamLimitPOperator());
clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
clone2.setExecutionMode(op2.getExecutionMode());
clone2.recomputeSchema();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f1b9a7b..9c8ad46 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -172,9 +171,7 @@
break;
}
case LIMIT: {
- LimitOperator opLim = (LimitOperator) op;
- op.setPhysicalOperator(new StreamLimitPOperator(opLim.isTopmostLimitOp()
- && opLim.getExecutionMode() == ExecutionMode.PARTITIONED));
+ op.setPhysicalOperator(new StreamLimitPOperator());
break;
}
case NESTEDTUPLESOURCE: {
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
index 1cc34e1..f42e321 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
@@ -60,6 +60,5 @@
# For example, set the com.xyz.foo logger to only log SEVERE
# messages:
-edu.uci.ics.asterix.level = WARNING
-edu.uci.ics.algebricks.level = WARNING
+#edu.uci.ics.hyracks.algebricks.level = FINEST
edu.uci.ics.hyracks.level = WARNING
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 7e4e271..a049f15 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -40,9 +40,7 @@
public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();
static {
NORMALIZATION.add(new EliminateSubplanRule());
- NORMALIZATION.add(new IntroduceAggregateCombinerRule());
NORMALIZATION.add(new BreakSelectIntoConjunctsRule());
- NORMALIZATION.add(new IntroduceAggregateCombinerRule());
NORMALIZATION.add(new PushSelectIntoJoinRule());
NORMALIZATION.add(new ExtractGbyExpressionsRule());
NORMALIZATION.add(new RemoveRedundantSelectRule());
@@ -84,6 +82,7 @@
CONSOLIDATION.add(new IntroduceEarlyProjectRule());
CONSOLIDATION.add(new ConsolidateAssignsRule());
CONSOLIDATION.add(new IntroduceGroupByCombinerRule());
+ CONSOLIDATION.add(new IntroduceAggregateCombinerRule());
CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index d099645..8410e1e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,7 @@
ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 0;
+ ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
// cluster controller