merge from zheilbron/hyracks_msr
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 54e2256..6564eb0 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
     		<groupId>edu.uci.ics.hyracks</groupId>
     		<artifactId>pregelix</artifactId>
-    		<version>0.2.7-SNAPSHOT</version>
+    		<version>0.2.10-SNAPSHOT</version>
   	</parent>
 
 
@@ -88,89 +88,89 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..d46457c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -168,11 +168,16 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[3];
+
+                if (vertex.isPartitionTerminated()) {
+                    vertex.voteToHalt();
+                    return;
+                }
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
 
-                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+                MsgList msgContentList = (MsgList) tuple[1];
                 msgContentList.reset(msgIterator);
 
                 if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,9 +188,15 @@
                 }
 
                 try {
+                    if (msgContentList.segmentStart()) {
+                        vertex.open();
+                    }
                     vertex.compute(msgIterator);
+                    if (msgContentList.segmentEnd()) {
+                        vertex.close();
+                    }
                     vertex.finishCompute();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
 
@@ -194,7 +205,6 @@
                  */
                 if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
                     terminate = false;
-
                 aggregator.step(vertex);
             }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..eba75c9 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -172,6 +172,10 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[1];
+                if (vertex.isPartitionTerminated()) {
+                    vertex.voteToHalt();
+                    return;
+                }
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
@@ -184,12 +188,13 @@
                 }
 
                 try {
+                    vertex.open();
                     vertex.compute(msgIterator);
+                    vertex.close();
                     vertex.finishCompute();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
-
                 /**
                  * this partition should not terminate
                  */
@@ -200,6 +205,7 @@
                  * call the global aggregator
                  */
                 aggregator.step(vertex);
+
             }
 
             @Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 77f28e4..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -14,22 +14,27 @@
  */
 package edu.uci.ics.pregelix.runtime.simpleagg;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
 
-public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateFunctionFactory[] aggFactories;
@@ -41,52 +46,56 @@
     @SuppressWarnings("unchecked")
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] groupFields, int[] partialgroupFields,
+            final IFrameWriter writer, final ByteBuffer outputFrame, final FrameTupleAppender appender)
+            throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
+        final ArrayTupleBuilder internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         return new IAggregatorDescriptor() {
-
             private FrameTupleReference ftr = new FrameTupleReference();
+            private int groupKeySize = 0;
+            private int metaSlotSize = 4;
+
+            @Override
+            public AggregateState createAggregateStates() {
+                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i] = new ArrayBackedValueStorage();
+                    try {
+                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i], writer);
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+                return new AggregateState(Pair.of(aggOutput, agg));
+            }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
-                IAggregateFunction[] agg = aggState.getRight();
-
-                // initialize aggregate functions
-                for (int i = 0; i < agg.length; i++) {
-                    aggOutput[i].reset();
-                    try {
-                        agg[i].init();
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
+                setGroupKeySize(accessor, tIndex);
+                initAggregateFunctions(state, true);
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    throw new HyracksDataException(
+                            "Message combiner intermediate data size "
+                                    + stateSize
+                                    + " is larger than frame size! Check the size estimattion implementation in the message combiner.");
                 }
-
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
+                singleStep(accessor, tIndex, state);
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                IAggregateFunction[] agg = aggState.getRight();
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    emitResultTuple(accessor, tIndex, state);
+                    initAggregateFunctions(state, false);
                 }
+                singleStep(accessor, tIndex, state);
             }
 
             @Override
@@ -97,7 +106,7 @@
                 IAggregateFunction[] agg = aggState.getRight();
                 for (int i = 0; i < agg.length; i++) {
                     try {
-                        agg[i].finish();
+                        agg[i].finishAll();
                         tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
                                 aggOutput[i].getLength());
                     } catch (Exception e) {
@@ -107,21 +116,6 @@
             }
 
             @Override
-            public AggregateState createAggregateStates() {
-                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
-                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
-                for (int i = 0; i < agg.length; i++) {
-                    aggOutput[i] = new ArrayBackedValueStorage();
-                    try {
-                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
-                    } catch (Exception e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-                return new AggregateState(Pair.of(aggOutput, agg));
-            }
-
-            @Override
             public void reset() {
 
             }
@@ -137,6 +131,97 @@
 
             }
 
+            private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+
+                /**
+                 * initialize aggregate functions
+                 */
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i].reset();
+                    try {
+                        if (all) {
+                            agg[i].initAll();
+                        } else {
+                            agg[i].init();
+                        }
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private void singleStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private int estimateStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                int size = metaSlotSize + groupKeySize;
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        size += agg[i].estimateStep(ftr) + metaSlotSize;
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                return size;
+            }
+
+            private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                internalTupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
+                }
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, writer);
+                    appender.reset(outputFrame, true);
+                    if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                            internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                        throw new HyracksDataException("The output cannot be fit into a frame.");
+                    }
+                }
+            }
+
+            public void setGroupKeySize(IFrameTupleAccessor accessor, int tIndex) {
+                groupKeySize = 0;
+                for (int i = 0; i < groupFields.length; i++) {
+                    int fIndex = groupFields[i];
+                    int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+                    int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+                    groupKeySize += fLen + metaSlotSize;
+                }
+            }
+
         };
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 8090dff..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -54,10 +56,11 @@
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
 
-    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
-            boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
+            IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
+            throws HyracksDataException {
         this.conf = confFactory.createConfiguration(ctx);
-        this.output = output;
+        this.output = tmpOutput;
         this.isFinalStage = isFinalStage;
         this.partialAggAsInput = partialAggAsInput;
         msgList.setConf(this.conf);
@@ -68,6 +71,12 @@
     }
 
     @Override
+    public void initAll() throws HyracksDataException {
+        keyRead = false;
+        combiner.initAll(msgList);
+    }
+
+    @Override
     public void init() throws HyracksDataException {
         keyRead = false;
         combiner.init(msgList);
@@ -75,6 +84,43 @@
 
     @Override
     public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        if (!partialAggAsInput) {
+            combiner.stepPartial(key, (WritableSizable) value);
+        } else {
+            combiner.stepFinal(key, value);
+        }
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinal();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void finishAll() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinalAll();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
         FrameTupleReference ftr = (FrameTupleReference) tuple;
         IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
         ByteBuffer buffer = fta.getBuffer();
@@ -94,28 +140,13 @@
             }
             value.readFields(valueInput);
             if (!partialAggAsInput) {
-                combiner.stepPartial(key, value);
+                return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
             } else {
-                combiner.stepFinal(key, value);
+                return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-
-    }
-
-    @Override
-    public void finish() throws HyracksDataException {
-        try {
-            if (!isFinalStage) {
-                combinedResult = combiner.finishPartial();
-            } else {
-                combinedResult = combiner.finishFinal();
-            }
-            combinedResult.write(output);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
     }
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index 33dfa5d..54eccf5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
@@ -37,9 +38,9 @@
     }
 
     @Override
-    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
-            throws HyracksException {
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+            IFrameWriter writer) throws HyracksException {
         DataOutput output = provider.getDataOutput();
-        return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
+        return new AggregationFunction(ctx, confFactory, output, writer, isFinalStage, partialAggAsInput);
     }
 }