Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort

1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.

One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator

Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array

Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c9ef2f3..f6de971 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -41,6 +41,7 @@
     RANDOM_MERGE_EXCHANGE,
     RTREE_SEARCH,
     RUNNING_AGGREGATE,
+    SORT_GROUP_BY,
     SORT_MERGE_EXCHANGE,
     SINK,
     SINK_WRITE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IMergeAggregationExpressionFactory.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IMergeAggregationExpressionFactory.java
index 6e366ff..6a6989d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IMergeAggregationExpressionFactory.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IMergeAggregationExpressionFactory.java
@@ -17,8 +17,9 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
 public interface IMergeAggregationExpressionFactory {
-    ILogicalExpression createMergeAggregation(ILogicalExpression expr, IOptimizationContext env)
+    ILogicalExpression createMergeAggregation(LogicalVariable originalAggVariable, ILogicalExpression expr, IOptimizationContext env)
             throws AlgebricksException;
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
index 1ddfe64..4241146 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
@@ -136,10 +136,17 @@
                 }
 
                 @Override
+                public void finishPartial(IPointable result) throws AlgebricksException {
+                    caf.finishPartial();
+                    result.set(abvs);
+                }
+
+                @Override
                 public void finish(IPointable result) throws AlgebricksException {
                     caf.finish();
                     result.set(abvs);
                 }
+
             };
         }
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
new file mode 100644
index 0000000..e92e3ee
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+
+public class SortGroupByPOperator extends AbstractPhysicalOperator {
+
+    private final int frameLimit;
+    private final OrderColumn[] orderColumns;
+    private final List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
+
+    public SortGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
+            OrderColumn[] orderColumns) {
+        this.frameLimit = frameLimit;
+        this.orderColumns = orderColumns;
+        computeColumnSet(gbyList);
+    }
+
+    private void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
+        columnSet.clear();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression v = (VariableReferenceExpression) expr;
+                columnSet.add(v.getVariableReference());
+            }
+        }
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SORT_GROUP_BY;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + columnSet;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    private List<LogicalVariable> getGbyColumns() {
+        return columnSet;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
+
+        GroupByOperator gOp = (GroupByOperator) op;
+        Set<LogicalVariable> columnSet = new ListSet<LogicalVariable>();
+
+        if (!columnSet.isEmpty()) {
+            propsLocal.add(new LocalGroupingProperty(columnSet));
+        }
+        for (OrderColumn oc : orderColumns) {
+            propsLocal.add(new LocalOrderProperty(oc));
+        }
+        for (ILogicalPlan p : gOp.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                ILogicalOperator rOp = r.getValue();
+                propsLocal.addAll(rOp.getDeliveredPhysicalProperties().getLocalProperties());
+            }
+        }
+
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
+        deliveredProperties = new StructuralPropertiesVector(childProp.getPartitioningProperty(), propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        List<LogicalVariable> gbyCols = getGbyColumns();
+        int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
+        GroupByOperator gby = (GroupByOperator) op;
+        int numFds = gby.getDecorList().size();
+        int fdColumns[] = new int[numFds];
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("Sort group-by expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            fdColumns[j++] = inputSchemas[0].findVariable(decor);
+        }
+
+        if (gby.getNestedPlans().size() != 1) {
+            throw new AlgebricksException(
+                    "Sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        ILogicalPlan p0 = gby.getNestedPlans().get(0);
+        if (p0.getRoots().size() != 1) {
+            throw new AlgebricksException(
+                    "Sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+
+        IPartialAggregationTypeComputer partialAggregationTypeComputer = context.getPartialAggregationTypeComputer();
+        List<Object> intermediateTypes = new ArrayList<Object>();
+        int n = aggOp.getExpressions().size();
+        IAggregateEvaluatorFactory[] aff = new IAggregateEvaluatorFactory[n];
+        int i = 0;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IVariableTypeEnvironment aggOpInputEnv = context.getTypeEnvironment(aggOp.getInputs().get(0).getValue());
+        IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+            aff[i++] = expressionRuntimeProvider.createAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+                    context);
+            intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+                    context.getMetadataProvider()));
+        }
+
+        int[] keyAndDecFields = new int[keys.length + fdColumns.length];
+        for (i = 0; i < keys.length; ++i) {
+            keyAndDecFields[i] = keys[i];
+        }
+        for (i = 0; i < fdColumns.length; i++) {
+            keyAndDecFields[keys.length + i] = fdColumns[i];
+        }
+
+        List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList())
+            keyAndDecVariables.add(p.first);
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList())
+            keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+
+        for (LogicalVariable var : keyAndDecVariables)
+            aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+
+        compileSubplans(inputSchemas[0], gby, opSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+        IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[gbyCols.size()];
+        IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+        i = 0;
+        for (LogicalVariable v : gbyCols) {
+            Object type = aggOpInputEnv.getVarType(v);
+            if (orderColumns[i].getOrder() == OrderKind.ASC) {
+                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+            } else {
+                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, false);
+            }
+            i++;
+        }
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+
+        IAggregateEvaluatorFactory[] merges = new IAggregateEvaluatorFactory[n];
+        List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+        IOperatorSchema[] localInputSchemas = new IOperatorSchema[1];
+        localInputSchemas[0] = new OperatorSchemaImpl();
+        for (i = 0; i < n; i++) {
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+                    .get(i).getValue();
+            aggFun.getUsedVariables(usedVars);
+        }
+        i = 0;
+        for (Object type : intermediateTypes) {
+            aggOpInputEnv.setVarType(usedVars.get(i++), type);
+        }
+        for (LogicalVariable keyVar : keyAndDecVariables)
+            localInputSchemas[0].addVariable(keyVar);
+        for (LogicalVariable usedVar : usedVars)
+            localInputSchemas[0].addVariable(usedVar);
+        for (i = 0; i < n; i++) {
+            AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+                    .get(i).getValue();
+            merges[i] = expressionRuntimeProvider.createAggregateFunctionFactory(mergeFun, aggOpInputEnv,
+                    localInputSchemas, context);
+        }
+        RecordDescriptor partialAggRecordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                localInputSchemas[0], context);
+
+        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aff,
+                keyAndDecFields);
+        IAggregatorDescriptorFactory mergeFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(merges,
+                keyAndDecFields);
+
+        INormalizedKeyComputerFactory normalizedKeyFactory = null;
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        if (nkcfProvider == null) {
+            normalizedKeyFactory = null;
+        }
+        Object type = aggOpInputEnv.getVarType(gbyCols.get(0));
+        normalizedKeyFactory = orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider
+                .getNormalizedKeyComputerFactory(type, true) : nkcfProvider
+                .getNormalizedKeyComputerFactory(type, false);
+        SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
+                keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
+                partialAggRecordDescriptor, recordDescriptor, false);
+
+        contributeOpDesc(builder, gby, gbyOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index a343d98..f32689f 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -16,8 +16,7 @@
   <modelVersion>4.0.0</modelVersion>
   <artifactId>piglet-example</artifactId>
   <name>piglet-example</name>
-
-  <parent>
+ <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>algebricks-examples</artifactId>
     <version>0.2.12-SNAPSHOT</version>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
new file mode 100644
index 0000000..56b2a8e
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortGroupByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * @author yingyib
+ *         merge externalsort+preclustered-gby into sort-gby
+ */
+public class PushGroupByIntoSortRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op1 = opRef.getValue();
+        if (op1 == null) {
+            return false;
+        }
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> childRef : op1.getInputs()) {
+            AbstractLogicalOperator op = (AbstractLogicalOperator) childRef.getValue();
+            if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+                PhysicalOperatorTag opTag = op.getPhysicalOperator().getOperatorTag();
+                GroupByOperator groupByOperator = (GroupByOperator) op;
+                if (opTag == PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY) {
+                    Mutable<ILogicalOperator> op2Ref = op.getInputs().get(0).getValue().getInputs().get(0);
+                    AbstractLogicalOperator op2 = (AbstractLogicalOperator) op2Ref.getValue();
+                    if (op2.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT) {
+                        AbstractStableSortPOperator sortPhysicalOperator = (AbstractStableSortPOperator) op2
+                                .getPhysicalOperator();
+                        if (groupByOperator.getNestedPlans().size() != 1) {
+                            //Sort group-by currently works only for one nested plan with one root containing
+                            //an aggregate and a nested-tuple-source.
+                            continue;
+                        }
+                        ILogicalPlan p0 = groupByOperator.getNestedPlans().get(0);
+                        if (p0.getRoots().size() != 1) {
+                            //Sort group-by currently works only for one nested plan with one root containing
+                            //an aggregate and a nested-tuple-source.
+                            continue;
+                        }
+
+                        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+                        AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+                        if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+                            //we only rewrite aggregation function; do nothing for running aggregates
+                            continue;
+                        }
+                        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+                        AbstractLogicalOperator aggInputOp = (AbstractLogicalOperator) aggOp.getInputs().get(0)
+                                .getValue();
+                        if (aggInputOp.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                            continue;
+                        }
+
+                        boolean hasIntermediateAggregate = generateMergeAggregationExpressions(groupByOperator, context);
+                        if (!hasIntermediateAggregate) {
+                            continue;
+                        }
+
+                        //replace preclustered gby with sort gby
+                        op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(), context
+                                .getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy(), sortPhysicalOperator
+                                .getSortColumns()));
+
+                        // remove the stable sort operator
+                        op.getInputs().clear();
+                        op.getInputs().addAll(op2.getInputs());
+                        changed = true;
+                    }
+                }
+                continue;
+            } else {
+                continue;
+            }
+        }
+        return changed;
+    }
+
+    private boolean generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
+            throws AlgebricksException {
+        if (gby.getNestedPlans().size() != 1) {
+            throw new AlgebricksException(
+                    "External/sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        ILogicalPlan p0 = gby.getNestedPlans().get(0);
+        if (p0.getRoots().size() != 1) {
+            throw new AlgebricksException(
+                    "External/sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        IMergeAggregationExpressionFactory mergeAggregationExpressionFactory = context
+                .getMergeAggregationExpressionFactory();
+        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+        List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+        List<LogicalVariable> originalAggVars = aggOp.getVariables();
+        int n = aggOp.getExpressions().size();
+        List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
+        for (int i = 0; i < n; i++) {
+            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
+                    originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+            if (mergeExpr == null) {
+                return false;
+            }
+            mergeExpressionRefs.add(new MutableObject<ILogicalExpression>(mergeExpr));
+        }
+        aggOp.setMergeExpressions(mergeExpressionRefs);
+        return true;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 9efb078..fd7d31d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
@@ -44,7 +45,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -150,12 +150,16 @@
                                     throw new NotImplementedException(
                                             "External hash group-by for nested grouping is not implemented.");
                                 }
-                                ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
-                                        gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                        physicalOptimizationConfig.getExternalGroupByTableSize());
-                                op.setPhysicalOperator(externalGby);
-                                generateMergeAggregationExpressions(gby, context);
-                                break;
+
+                                boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
+                                if (hasIntermediateAgg) {
+                                    ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
+                                            gby.getGroupByList(),
+                                            physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
+                                            physicalOptimizationConfig.getExternalGroupByTableSize());
+                                    op.setPhysicalOperator(externalGby);
+                                    break;
+                                }
                             }
                         }
                     }
@@ -331,15 +335,19 @@
         return payload;
     }
 
-    private static void generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
+    private static boolean generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
             throws AlgebricksException {
         if (gby.getNestedPlans().size() != 1) {
+            //External/Sort group-by currently works only for one nested plan with one root containing
+            //an aggregate and a nested-tuple-source.
             throw new AlgebricksException(
                     "External group-by currently works only for one nested plan with one root containing"
                             + "an aggregate and a nested-tuple-source.");
         }
         ILogicalPlan p0 = gby.getNestedPlans().get(0);
         if (p0.getRoots().size() != 1) {
+            //External/Sort group-by currently works only for one nested plan with one root containing
+            //an aggregate and a nested-tuple-source.
             throw new AlgebricksException(
                     "External group-by currently works only for one nested plan with one root containing"
                             + "an aggregate and a nested-tuple-source.");
@@ -347,15 +355,24 @@
         IMergeAggregationExpressionFactory mergeAggregationExpressionFactory = context
                 .getMergeAggregationExpressionFactory();
         Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+        AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+        if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
         AggregateOperator aggOp = (AggregateOperator) r0.getValue();
         List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+        List<LogicalVariable> originalAggVars = aggOp.getVariables();
         int n = aggOp.getExpressions().size();
         List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
         for (int i = 0; i < n; i++) {
-            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(aggFuncRefs.get(i)
-                    .getValue(), context);
+            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
+                    originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+            if (mergeExpr == null) {
+                return false;
+            }
             mergeExpressionRefs.add(new MutableObject<ILogicalExpression>(mergeExpr));
         }
         aggOp.setMergeExpressions(mergeExpressionRefs);
+        return true;
     }
 }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
index 594514c..e6baffa 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -55,6 +55,11 @@
                     throw new AlgebricksException(e);
                 }
             }
+
+            @Override
+            public void finishPartial(IPointable result) throws AlgebricksException {
+                finish(result);
+            }
         };
     }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
index 99d7b91..5719bff 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -23,5 +23,7 @@
 
     public void step(IFrameTupleReference tuple) throws AlgebricksException;
 
+    public void finishPartial(IPointable result) throws AlgebricksException;
+
     public void finish(IPointable result) throws AlgebricksException;
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 11a7a5c..b3eab7b 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -121,7 +121,16 @@
             @Override
             public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
                     int tIndex, AggregateState state) throws HyracksDataException {
-                throw new IllegalStateException("this method should not be called");
+                IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finishPartial(p);
+                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                return true;
             }
 
             @Override