Reintegrated hyracks_fix_agg. Removed invorrect rule that adds a grouopby around a standalone aggregate. Replaced it with a rule for introducing a combiner to standalone aggregates.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_asterix_stabilization@1829 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index d9496f8..2f53f9b 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,6 +20,7 @@
     // private ArrayList<AggregateFunctionCallExpression> expressions;
     // TODO type safe list of expressions
     private List<Mutable<ILogicalExpression>> mergeExpressions;
+    private LogicalVariable partitioningVariable;
 
     public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
         super(variables, expressions);
@@ -68,6 +69,14 @@
         return mergeExpressions;
     }
 
+    public void setPartitioningVariable(LogicalVariable partitioningVariable) {
+        this.partitioningVariable = partitioningVariable;
+    }
+
+    public LogicalVariable getPartitioningVariable() {
+        return partitioningVariable;
+    }
+
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 56487cc..3a82ccd 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -71,6 +71,9 @@
         for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

             exprRef.getValue().getUsedVariables(usedVariables);

         }

+        if (op.getPartitioningVariable() != null) {

+            usedVariables.add(op.getPartitioningVariable());

+        }

         return null;

     }

 

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 1397a56..81dc2c2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,10 +16,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -29,12 +31,15 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -62,7 +67,16 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        return emptyUnaryRequirements();
+        AggregateOperator aggOp = (AggregateOperator) op;
+        if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
+            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+            Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
+            partitioningVariables.add(aggOp.getPartitioningVariable());
+            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+            return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+        } else {
+            return emptyUnaryRequirements();
+        }
     }
 
     @Override
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index e6b296b..471b71c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -68,7 +68,7 @@
         runPhysicalOptimizations(plan, physicalRewrites);
         StringBuilder sb2 = new StringBuilder();
         PlanPrettyPrinter.printPlan(plan, sb2, ppvisitor, 0);
-        AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Optimized Plan:\n" + sb2.toString());
+        AlgebricksConfig.ALGEBRICKS_LOGGER.info("Optimized Plan:\n" + sb2.toString());
     }
 
     private void runOptimizationSets(ILogicalPlan plan,
diff --git a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 90ee5bb..b2a5215 100644
--- a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -16,7 +16,6 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InferTypesRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
@@ -40,7 +39,10 @@
     public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
         List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
         normalization.add(new EliminateSubplanRule());
-        normalization.add(new IntroduceGroupByForStandaloneAggregRule());
+        // TODO: This rule is incorrect and has been removed. Its replacement in
+        // Asterix (PushAggFuncIntoStandaloneAggregateRule)
+        // is language-specific.
+        // normalization.add(new IntroduceGroupByForStandaloneAggregRule());
         normalization.add(new BreakSelectIntoConjunctsRule());
         normalization.add(new PushSelectIntoJoinRule());
         normalization.add(new ExtractGbyExpressionsRule());
@@ -100,7 +102,6 @@
         return physicalPlanRewrites;
     }
 
-    
     public final static List<IAlgebraicRewriteRule> prepareForJobGenRuleCollection() {
         List<IAlgebraicRewriteRule> prepareForJobGenRewrites = new LinkedList<IAlgebraicRewriteRule>();
         prepareForJobGenRewrites.add(new IsolateHyracksOperatorsRule(
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
new file mode 100644
index 0000000..08271c1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -0,0 +1,144 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    /**
+     * Replace the original aggregate functions with their corresponding global aggregate function.
+     */
+    public void replaceOriginalAggFuncs(Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap) {
+        for (Map.Entry<AggregateFunctionCallExpression, SimilarAggregatesInfo> entry : toReplaceMap.entrySet()) {
+            SimilarAggregatesInfo sai = entry.getValue();
+            for (AggregateExprInfo aei : sai.simAggs) {
+                AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
+                afce.setFunctionInfo(aei.newFunInfo);
+                afce.getArguments().clear();
+                afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult));
+            }
+        }
+    }
+
+    protected Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg,
+            GroupByOperator newGbyOp, Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap,
+            IOptimizationContext context) throws AlgebricksException {
+
+        ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
+        ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
+
+        List<LogicalVariable> initVars = initAgg.getVariables();
+        List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
+        int numExprs = initVars.size();
+
+        // First make sure that all agg funcs are two step, otherwise we cannot use local aggs.
+        for (int i = 0; i < numExprs; i++) {
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue();
+            if (!aggFun.isTwoStep()) {
+                return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null);
+            }
+        }
+
+        boolean haveAggToReplace = false;
+        for (int i = 0; i < numExprs; i++) {
+            Mutable<ILogicalExpression> expRef = initExprs.get(i);
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
+            IFunctionInfo fi1 = aggFun.getStepOneAggregate();
+            // Clone the aggregate's args.
+            List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun
+                    .getArguments().size());
+            for (Mutable<ILogicalExpression> er : aggFun.getArguments()) {
+                newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
+            }
+            IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
+            SimilarAggregatesInfo inf = toReplaceMap.get(aggFun);
+            if (inf == null) {
+                inf = new SimilarAggregatesInfo();
+                LogicalVariable newAggVar = context.newVar();
+                pushedVars.add(newAggVar);
+                inf.stepOneResult = new VariableReferenceExpression(newAggVar);
+                inf.simAggs = new ArrayList<AggregateExprInfo>();
+                toReplaceMap.put(aggFun, inf);
+                AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
+                pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
+            }
+            AggregateExprInfo aei = new AggregateExprInfo();
+            aei.aggExprRef = expRef;
+            aei.newFunInfo = fi2;
+            inf.simAggs.add(aei);
+            haveAggToReplace = true;
+        }
+
+        if (!pushedVars.isEmpty()) {
+            AggregateOperator pushedAgg = new AggregateOperator(pushedVars, pushedExprs);
+            pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
+            // If newGbyOp is null, then we optimizing an aggregate without group by.
+            if (newGbyOp != null) {
+                // Hook up the nested aggregate op with the outer group by.
+                NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(
+                        newGbyOp));
+                nts.setExecutionMode(ExecutionMode.LOCAL);
+                pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+            } else {
+                // The local aggregate operator is fed by the input of the original aggregate operator.
+                pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
+                // Set the partitioning variable in the local agg to ensure it is not projected away.
+                context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
+                LogicalVariable trueVar = context.newVar();
+                // Reintroduce assign op for the global agg partitioning var.
+                AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
+                        ConstantExpression.TRUE));
+                trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
+                context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
+                initAgg.setPartitioningVariable(trueVar);
+                initAgg.getInputs().get(0).setValue(trueAssignOp);
+            }
+            return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
+        } else {
+            return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace, null);
+        }
+    }
+
+    protected class SimilarAggregatesInfo {
+        ILogicalExpression stepOneResult;
+        List<AggregateExprInfo> simAggs;
+    }
+
+    protected class AggregateExprInfo {
+        Mutable<ILogicalExpression> aggExprRef;
+        IFunctionInfo newFunInfo;
+    }
+
+    protected class BookkeepingInfo {
+        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
+        Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>();
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
new file mode 100644
index 0000000..c3d935c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+
+public class IntroduceAggregateCombinerRule extends AbstractIntroduceCombinerRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        context.addToDontApplySet(this, op);
+        if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AggregateOperator aggOp = (AggregateOperator) op;
+        if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+            return false;
+        }
+        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
+        Pair<Boolean, Mutable<ILogicalOperator>> result = tryToPushAgg(aggOp, null, toReplaceMap, context);
+        if (!result.first || result.second == null) {
+            return false;
+        }
+        replaceOriginalAggFuncs(toReplaceMap);
+        context.computeAndSetTypeEnvironmentForOperator(aggOp);
+        return true;
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java
deleted file mode 100644
index 7f987e4..0000000
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java
+++ /dev/null
@@ -1,323 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroduceCombinerRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (context.checkIfInDontApplySet(this, op)) {
-            return false;
-        }
-        context.addToDontApplySet(this, op);
-        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
-            return false;
-        }
-        GroupByOperator gbyOp = (GroupByOperator) op;
-        if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
-            return false;
-        }
-
-        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
-        BookkeepingInfo bi = new BookkeepingInfo();
-        bi.toReplaceMap = toReplaceMap;
-        bi.modifGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>();
-
-        GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
-        if (newGbyOp == null) {
-            return false;
-        }
-
-        for (Map.Entry<AggregateFunctionCallExpression, SimilarAggregatesInfo> entry : toReplaceMap.entrySet()) {
-            SimilarAggregatesInfo sai = entry.getValue();
-            for (AggregateExprInfo aei : sai.simAggs) {
-                AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
-                afce.setFunctionInfo(aei.newFunInfo);
-                afce.getArguments().clear();
-                afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult));
-            }
-        }
-
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
-            LogicalVariable newDecorVar = context.newVar();
-            newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
-            p.second.setValue(new VariableReferenceExpression(newDecorVar));
-        }
-        newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
-        Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
-        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
-
-        Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
-        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
-
-        List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
-        VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
-
-        Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
-        OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
-
-        for (LogicalVariable var : freeVars) {
-            if (!propagatedVars.contains(var)) {
-                LogicalVariable newDecorVar = context.newVar();
-                newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
-                VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
-                        newDecorVar, context);
-            }
-        }
-
-        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
-        opRef3.setValue(newGbyOp);
-        typeGby(newGbyOp, context);
-        typeGby(gbyOp, context);
-        return true;
-    }
-
-    private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
-        for (ILogicalPlan p : op.getNestedPlans()) {
-            OperatorPropertiesUtil.typePlan(p, context);
-        }
-        context.computeAndSetTypeEnvironmentForOperator(op);
-    }
-
-    private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
-            throws AlgebricksException {
-
-        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
-        ILogicalOperator op3 = opRef3.getValue();
-        GroupByOperator newGbyOp = new GroupByOperator();
-        newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
-        // copy annotations
-        Map<String, Object> annotations = newGbyOp.getAnnotations();
-        for (Entry<String, Object> a : gbyOp.getAnnotations().entrySet())
-            annotations.put(a.getKey(), a.getValue());
-
-        List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
-
-        for (ILogicalPlan p : gbyOp.getNestedPlans()) {
-            Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
-            if (!bip.first) {
-                // for now, if we cannot push everything, give up
-                return null;
-            }
-            ILogicalPlan pushedSubplan = bip.second;
-            if (pushedSubplan != null) {
-                newGbyOp.getNestedPlans().add(pushedSubplan);
-            }
-        }
-
-        ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
-        ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
-        // find maximal sequence of variable
-        for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifGbyMap.entrySet()) {
-            List<LogicalVariable> varList = e.getValue();
-            boolean see1 = true;
-            int sz1 = newOpGbyList.size();
-            int i = 0;
-            for (LogicalVariable v : varList) {
-                if (see1) {
-                    if (i < sz1) {
-                        LogicalVariable v2 = newOpGbyList.get(i);
-                        if (v != v2) {
-                            // cannot linearize
-                            return null;
-                        }
-                    } else {
-                        see1 = false;
-                        newOpGbyList.add(v);
-                        replGbyList.add(context.newVar());
-                    }
-                    i++;
-                } else {
-                    newOpGbyList.add(v);
-                    replGbyList.add(context.newVar());
-                }
-            }
-        }
-        // set the vars in the new op
-        int n = newOpGbyList.size();
-        for (int i = 0; i < n; i++) {
-            newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
-            VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
-        }
-        return newGbyOp;
-    }
-
-    private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan p, GroupByOperator oldGbyOp,
-            GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context) {
-        List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
-        List<Mutable<ILogicalOperator>> toPushR = new ArrayList<Mutable<ILogicalOperator>>();
-        for (Mutable<ILogicalOperator> r : p.getRoots()) {
-            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, toPushR)) {
-                // for now, if we cannot push everything, give up
-                return new Pair<Boolean, ILogicalPlan>(false, null);
-            }
-        }
-        for (Mutable<ILogicalOperator> root : toPushR) {
-            pushedRoots.add(root);
-        }
-        if (pushedRoots.isEmpty()) {
-            return new Pair<Boolean, ILogicalPlan>(true, null);
-        } else {
-            return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
-        }
-    }
-
-    private boolean tryToPushRoot(Mutable<ILogicalOperator> r, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
-            BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
-            List<Mutable<ILogicalOperator>> toPushAccumulate) {
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue();
-        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            return false;
-        }
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            AggregateOperator initAgg = (AggregateOperator) op1;
-            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
-            if (!pOpRef.first) {
-                return false;
-            }
-            Mutable<ILogicalOperator> opRef = pOpRef.second;
-            if (opRef != null) {
-                toPushAccumulate.add(opRef);
-            }
-            bi.modifGbyMap.put(oldGbyOp, gbyVars);
-            return true;
-        } else {
-            while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
-                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
-            }
-            if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                return false;
-            }
-            GroupByOperator nestedGby = (GroupByOperator) op2;
-            List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
-            List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
-            concatGbyVars.addAll(gbyVars2);
-            for (ILogicalPlan p : nestedGby.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
-                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        }
-    }
-
-    private Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg, GroupByOperator newGbyOp,
-            Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap, IOptimizationContext context) {
-
-        ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
-
-        List<LogicalVariable> initVars = initAgg.getVariables();
-        List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
-        int sz = initVars.size();
-        for (int i = 0; i < sz; i++) {
-            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue();
-            if (!aggFun.isTwoStep()) {
-                return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null);
-            }
-        }
-
-        boolean haveAggToReplace = false;
-        for (int i = 0; i < sz; i++) {
-            Mutable<ILogicalExpression> expRef = initExprs.get(i);
-            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
-            IFunctionInfo fi1 = aggFun.getStepOneAggregate();
-            List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun
-                    .getArguments().size());
-            for (Mutable<ILogicalExpression> er : aggFun.getArguments()) {
-                newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
-            }
-            //            AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
-            //            pushedExprs.add(new Mutable<ILogicalExpression>(aggLocal));
-
-            IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
-
-            SimilarAggregatesInfo inf = toReplaceMap.get(aggFun);
-            if (inf == null) {
-                inf = new SimilarAggregatesInfo();
-                LogicalVariable newAggVar = context.newVar();
-                pushedVars.add(newAggVar);
-                inf.stepOneResult = new VariableReferenceExpression(newAggVar);
-                inf.simAggs = new ArrayList<AggregateExprInfo>();
-                toReplaceMap.put(aggFun, inf);
-                AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
-                pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
-            }
-            AggregateExprInfo aei = new AggregateExprInfo();
-            aei.aggExprRef = expRef;
-            aei.newFunInfo = fi2;
-            inf.simAggs.add(aei);
-            haveAggToReplace = true;
-        }
-
-        if (!pushedVars.isEmpty()) {
-            AggregateOperator pushedAgg = new AggregateOperator(pushedVars, pushedExprs);
-            pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
-            NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(newGbyOp));
-            nts.setExecutionMode(ExecutionMode.LOCAL);
-            pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(nts));
-            return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
-        } else {
-            return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace, null);
-        }
-    }
-
-    private class SimilarAggregatesInfo {
-        ILogicalExpression stepOneResult;
-        List<AggregateExprInfo> simAggs;
-    }
-
-    private class AggregateExprInfo {
-        Mutable<ILogicalExpression> aggExprRef;
-        IFunctionInfo newFunInfo;
-    }
-
-    private class BookkeepingInfo {
-        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap;
-        Map<GroupByOperator, List<LogicalVariable>> modifGbyMap;
-    }
-}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
new file mode 100644
index 0000000..5c5fdb1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -0,0 +1,218 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public class IntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        context.addToDontApplySet(this, op);
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        GroupByOperator gbyOp = (GroupByOperator) op;
+        if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
+            return false;
+        }
+
+        BookkeepingInfo bi = new BookkeepingInfo();
+        GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
+        if (newGbyOp == null) {
+            return false;
+        }
+
+        replaceOriginalAggFuncs(bi.toReplaceMap);
+
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
+            LogicalVariable newDecorVar = context.newVar();
+            newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
+            p.second.setValue(new VariableReferenceExpression(newDecorVar));
+        }
+        newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
+        Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
+        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
+
+        Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
+        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
+
+        List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
+
+        Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+        OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
+
+        for (LogicalVariable var : freeVars) {
+            if (!propagatedVars.contains(var)) {
+                LogicalVariable newDecorVar = context.newVar();
+                newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
+                VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
+                        newDecorVar, context);
+            }
+        }
+
+        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+        opRef3.setValue(newGbyOp);
+        typeGby(newGbyOp, context);
+        typeGby(gbyOp, context);
+        return true;
+    }
+
+    private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            OperatorPropertiesUtil.typePlan(p, context);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+    }
+
+    private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
+            throws AlgebricksException {
+        // Hook up input to new group-by.
+        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+        ILogicalOperator op3 = opRef3.getValue();
+        GroupByOperator newGbyOp = new GroupByOperator();
+        newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+        // Copy annotations.        
+        Map<String, Object> annotations = newGbyOp.getAnnotations();
+        annotations.putAll(gbyOp.getAnnotations());
+
+        List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+        for (ILogicalPlan p : gbyOp.getNestedPlans()) {
+            Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
+            if (!bip.first) {
+                // For now, if we cannot push everything, give up.
+                return null;
+            }
+            ILogicalPlan pushedSubplan = bip.second;
+            if (pushedSubplan != null) {
+                newGbyOp.getNestedPlans().add(pushedSubplan);
+            }
+        }
+
+        ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
+        // Find maximal sequence of variable.
+        for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
+            List<LogicalVariable> varList = e.getValue();
+            boolean see1 = true;
+            int sz1 = newOpGbyList.size();
+            int i = 0;
+            for (LogicalVariable v : varList) {
+                if (see1) {
+                    if (i < sz1) {
+                        LogicalVariable v2 = newOpGbyList.get(i);
+                        if (v != v2) {
+                            // cannot linearize
+                            return null;
+                        }
+                    } else {
+                        see1 = false;
+                        newOpGbyList.add(v);
+                        replGbyList.add(context.newVar());
+                    }
+                    i++;
+                } else {
+                    newOpGbyList.add(v);
+                    replGbyList.add(context.newVar());
+                }
+            }
+        }
+        // set the vars in the new op
+        int n = newOpGbyList.size();
+        for (int i = 0; i < n; i++) {
+            newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
+            VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
+        }
+        return newGbyOp;
+    }
+
+    private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
+            GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+        for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
+            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
+                // For now, if we cannot push everything, give up.
+                return new Pair<Boolean, ILogicalPlan>(false, null);
+            }
+        }
+        if (pushedRoots.isEmpty()) {
+            return new Pair<Boolean, ILogicalPlan>(true, null);
+        } else {
+            return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
+        }
+    }
+
+    private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
+            BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
+            List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            AggregateOperator initAgg = (AggregateOperator) op1;
+            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
+            if (!pOpRef.first) {
+                return false;
+            }
+            Mutable<ILogicalOperator> opRef = pOpRef.second;
+            if (opRef != null) {
+                toPushAccumulate.add(opRef);
+            }
+            bi.modifyGbyMap.put(oldGbyOp, gbyVars);
+            return true;
+        } else {
+            while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
+                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+            }
+            if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
+                return false;
+            }
+            GroupByOperator nestedGby = (GroupByOperator) op2;
+            List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+            List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
+            concatGbyVars.addAll(gbyVars2);
+            for (ILogicalPlan p : nestedGby.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
+                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java
deleted file mode 100644
index 5272347..0000000
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * When aggregates appear w/o group-by, a default group by a constant is
- * introduced.
- */
-
-public class IntroduceGroupByForStandaloneAggregRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-            return false;
-        }
-        Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-        if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            return false;
-        }
-
-        AssignOperator assign = (AssignOperator) op;
-        AggregateOperator agg = (AggregateOperator) op2;
-        if (agg.getVariables().size() != 1) {
-            return false;
-        }
-        LogicalVariable aggVar = agg.getVariables().get(0);
-        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(assign, used);
-        if (used.contains(aggVar)) {
-            Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-            LogicalVariable gbyVar = context.newVar();
-            // ILogicalExpression constOne = new ConstantExpression(new
-            // IntegerLiteral(new Integer(1)));
-            groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(gbyVar,
-                    new MutableObject<ILogicalExpression>(ConstantExpression.TRUE)));
-            NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>());
-            List<Mutable<ILogicalOperator>> aggInpList = agg.getInputs();
-            aggInpList.clear();
-            aggInpList.add(new MutableObject<ILogicalOperator>(nts));
-            ILogicalPlan np1 = new ALogicalPlanImpl(opRef2);
-            ArrayList<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
-            nestedPlans.add(np1);
-            GroupByOperator gbyOp = new GroupByOperator(groupByList,
-                    new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(), nestedPlans);
-            Mutable<ILogicalOperator> opRefGby = new MutableObject<ILogicalOperator>(gbyOp);
-            nts.getDataSourceReference().setValue(gbyOp);
-            gbyOp.getInputs().add(opRef3);
-            List<Mutable<ILogicalOperator>> asgnInpList = assign.getInputs();
-            context.computeAndSetTypeEnvironmentForOperator(nts);
-            context.computeAndSetTypeEnvironmentForOperator(agg);
-            context.computeAndSetTypeEnvironmentForOperator(gbyOp);
-            asgnInpList.clear();
-            asgnInpList.add(opRefGby);
-            return true;
-        }
-        return false;
-    }
-
-}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index f98aea2..fa95ce4 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -26,7 +26,7 @@
 public abstract class IndexDataflowHelper {
     protected IIndex index;
     protected int indexFileId = -1;
-    
+
     protected final int partition;
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
@@ -38,50 +38,50 @@
     }
 
     public void init(boolean forceCreate) throws HyracksDataException {
-    	IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
-    	IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
-    	IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
-    	FileReference fileRef = getFilereference();
-    	int fileId = -1;
-    	boolean fileIsMapped = false;
-    	synchronized (fileMapProvider) {
-    		fileIsMapped = fileMapProvider.isMapped(fileRef);
-    		if (!fileIsMapped) {
-    			bufferCache.createFile(fileRef);
-    		}            
-    		fileId = fileMapProvider.lookupFileId(fileRef);
-    		try {
-    	    	// Also creates the file if it doesn't exist yet.
-    			bufferCache.openFile(fileId);
-    		} catch (HyracksDataException e) {
-    			// Revert state of buffer cache since file failed to open.
-    			if (!fileIsMapped) {
-    				bufferCache.deleteFile(fileId, false);
-    			}
-    			throw e;
-    		}
-    	}
-    	// Only set indexFileId member after openFile() succeeds.
-    	indexFileId = fileId;    	
-    	// Create new index instance and register it.
-    	synchronized (indexRegistry) {
-    		// Check if the index has already been registered.
-    		boolean register = false;
-    	    index = indexRegistry.get(indexFileId);
-    		if (index == null) {
-    		    index = createIndexInstance();
-    		    register = true;
-    		}
-    		if (forceCreate) {
-    		    index.create(indexFileId);
-    		}
-    		index.open(indexFileId);
-    		if (register) {
-    		    indexRegistry.register(indexFileId, index);
-    		}
-    	}
+        IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+        IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+        IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
+        FileReference fileRef = getFilereference();
+        int fileId = -1;
+        boolean fileIsMapped = false;
+        synchronized (fileMapProvider) {
+            fileIsMapped = fileMapProvider.isMapped(fileRef);
+            if (!fileIsMapped) {
+                bufferCache.createFile(fileRef);
+            }
+            fileId = fileMapProvider.lookupFileId(fileRef);
+            try {
+                // Also creates the file if it doesn't exist yet.
+                bufferCache.openFile(fileId);
+            } catch (HyracksDataException e) {
+                // Revert state of buffer cache since file failed to open.
+                if (!fileIsMapped) {
+                    bufferCache.deleteFile(fileId, false);
+                }
+                throw e;
+            }
+        }
+        // Only set indexFileId member after openFile() succeeds.
+        indexFileId = fileId;
+        // Create new index instance and register it.
+        synchronized (indexRegistry) {
+            // Check if the index has already been registered.
+            boolean register = false;
+            index = indexRegistry.get(indexFileId);
+            if (index == null) {
+                index = createIndexInstance();
+                register = true;
+            }
+            if (forceCreate) {
+                index.create(indexFileId);
+            }
+            index.open(indexFileId);
+            if (register) {
+                indexRegistry.register(indexFileId, index);
+            }
+        }
     }
-    
+
     public abstract IIndex createIndexInstance() throws HyracksDataException;
 
     public FileReference getFilereference() {
@@ -96,7 +96,7 @@
             indexFileId = -1;
         }
     }
-    
+
     public IIndex getIndex() {
         return index;
     }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDropOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDropOperatorNodePushable.java
index 5c96af8..5f3c0b5 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDropOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDropOperatorNodePushable.java
@@ -68,7 +68,7 @@
             IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
             IBufferCache bufferCache = storageManager.getBufferCache(ctx);
             IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
-            
+
             FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
             int indexFileId = -1;
             synchronized (fileMapProvider) {