merge from zheilbron/hyracks_msr
diff --git a/pregelix/build.sh b/pregelix/build.sh
new file mode 100644
index 0000000..e213181
--- /dev/null
+++ b/pregelix/build.sh
@@ -0,0 +1,12 @@
+rm -rf dist
+mkdir dist
+
+hadoop_versions=(0.20.2 0.23.1 0.23.6 1.0.4 cdh-4.1 cdh-4.2)
+cd ../
+for v in ${hadoop_versions[@]}
+do
+   #echo mvn clean package -DskipTests=true -Dhadoop=${v}
+   mvn clean package -DskipTests=true -Dhadoop=${v}
+   #echo mv pregelix/pregelix-dist/target/pregelix-dist-*-binary-assembly.zip pregelix/dist/pregelix-dist-binary-assembley-hdfs-${v}.zip
+   mv pregelix/pregelix-dist/target/pregelix-dist-*-binary-assembly.zip pregelix/dist/pregelix-dist-binary-assembley-hdfs-${v}.zip
+done
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index c0c3822..de5ef3b 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -17,7 +17,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>pregelix</artifactId>
-  <version>0.2.7-SNAPSHOT</version>
+  <version>0.2.10-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>pregelix</name>
 
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 305b50c..0152a15 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 	<properties>
@@ -82,7 +82,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -95,7 +95,7 @@
 		<dependency>
                         <groupId>edu.uci.ics.hyracks</groupId>
                         <artifactId>hyracks-hdfs-core</artifactId>
-                        <version>0.2.7-SNAPSHOT</version>
+                        <version>0.2.10-SNAPSHOT</version>
                         <type>jar</type>
                         <scope>compile</scope>
                 </dependency>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 08c7151..5ea6413 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
@@ -39,7 +40,7 @@
  */
 
 @SuppressWarnings("rawtypes")
-public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> {
     /**
      * initialize aggregator
      */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index f5daf99..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -19,6 +19,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for combining of messages that are sent to the same vertex.
@@ -36,7 +37,7 @@
  *        the type of the partially combined messages
  */
 @SuppressWarnings("rawtypes")
-public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends WritableSizable, P extends Writable> {
 
     /**
      * initialize combiner
@@ -82,4 +83,36 @@
      * @return the final message List
      */
     public abstract MsgList<M> finishFinal();
+
+    /**
+     * init the combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public void initAll(MsgList providedMsgList) {
+        init(providedMsgList);
+    }
+
+    /**
+     * finish final combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public MsgList<M> finishFinalAll() {
+        return finishFinal();
+    }
+
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return 0;
+    }
+
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, P partialAggregate) throws HyracksDataException {
+        return 0;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 104f396..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,8 +15,11 @@
 
 package edu.uci.ics.pregelix.api.graph;
 
-import org.apache.hadoop.io.Writable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 
@@ -27,9 +30,11 @@
  * @param <M>
  *            message type
  */
-public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
     private static final long serialVersionUID = 1L;
+    private byte start = 1;
+    private byte end = 2;
 
     /**
      * Default constructor.s
@@ -43,4 +48,34 @@
     public void setClass() {
         setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
     }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeByte(start | end);
+        super.write(output);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        byte startEnd = input.readByte();
+        this.start = (byte) (startEnd & 1);
+        this.end = (byte) (startEnd & 2);
+        super.readFields(input);
+    }
+
+    public final void setSegmentStart(boolean segStart) {
+        this.start = (byte) (segStart ? 1 : 0);
+    }
+
+    public final void setSegmentEnd(boolean segEnd) {
+        this.end = (byte) (segEnd ? 2 : 0);
+    }
+
+    public boolean segmentStart() {
+        return start == 1 ? true : false;
+    }
+
+    public boolean segmentEnd() {
+        return end == 2 ? true : false;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 4175078..c52130d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,7 +32,9 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
@@ -48,7 +51,7 @@
  *            Message value type
  */
 @SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements Writable {
     private static long superstep = 0;
     /** Class-wide number of vertices */
@@ -75,6 +78,8 @@
     private boolean hasMessage = false;
     /** created new vertex */
     private boolean createdNewLiveVertex = false;
+    /** terminate the partition */
+    private boolean terminatePartition = false;
 
     /**
      * use object pool for re-using objects
@@ -87,12 +92,23 @@
     private int usedValue = 0;
 
     /**
-     * The key method that users need to implement
+     * The key method that users need to implement to process
+     * incoming messages in each superstep.
+     * 1. In a superstep, this method can be called multiple times in a continuous manner for a single
+     * vertex, each of which is to process a batch of messages. (Note that
+     * this only happens for the case when the mssages for a single vertex
+     * exceed one frame.)
+     * 2. In each superstep, before any invocation of this method for a vertex,
+     * open() is called; after all the invocations of this method for the vertex,
+     * close is called.
+     * 3. In each partition, the vertex Java object is reused
+     * for all the vertice to be processed in the same partition. (The model
+     * is the same as the key-value objects in hadoop map tasks.)
      * 
      * @param msgIterator
      *            an iterator of incoming messages
      */
-    public abstract void compute(Iterator<M> msgIterator);
+    public abstract void compute(Iterator<M> msgIterator) throws Exception;
 
     /**
      * Add an edge for the vertex.
@@ -569,4 +585,62 @@
         Vertex.context = context;
     }
 
+    @Override
+    public int hashCode() {
+        return vertexId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        Vertex vertex = (Vertex) object;
+        return vertexId.equals(vertex.getVertexId());
+    }
+
+    /**
+     * called immediately before invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
+     */
+    public void open() {
+
+    }
+
+    /**
+     * called immediately after all the invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
+     */
+    public void close() {
+
+    }
+
+    /**
+     * Terminate the current partition where the current vertex stays in.
+     * This will immediately take effect and the upcoming vertice in the
+     * same partition cannot be processed.
+     * 
+     */
+    protected final void terminatePartition() {
+        voteToHalt();
+        terminatePartition = true;
+    }
+
+    /**
+     * Terminate the Pregelix job.
+     * This will take effect only when the current iteration completed.
+     * 
+     * @throws Exception
+     */
+    protected void terminateJob() throws Exception {
+        Configuration conf = getContext().getConfiguration();
+        JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+    }
+
+    /***
+     * @return true if the partition is terminated; false otherwise
+     */
+    public boolean isPartitionTerminated() {
+        return terminatePartition;
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
new file mode 100644
index 0000000..568500b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+/**
+ * @author yingyib
+ */
+public interface Sizable {
+
+    public int sizeInBytes();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
index c841b1a..73af190 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -40,7 +40,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Logically split the vertices for a graph processing application.
      * Each {@link InputSplit} is then assigned to a worker for processing.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
index e6c62ba..ba8b561 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -39,7 +39,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Use the input split and context t o setup reading the vertices.
      * Guaranteed to be called prior to any other function.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
new file mode 100644
index 0000000..ee13f76
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * @author yingyib
+ */
+public interface WritableSizable extends Writable, Sizable {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
index 985bcff..1d3c427 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -26,13 +26,14 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This VertexInputFormat is meant for testing/debugging. It simply generates
  * some vertex data that can be consumed by test applications.
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
 
     @Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
index 92c8728..376d45d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -25,6 +25,7 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Used by GeneratedVertexInputFormat to read some generated data
@@ -37,7 +38,7 @@
  *            Edge value
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements VertexReader<I, V, E, M> {
     /** Records read so far */
     protected long recordsRead = 0;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
index 2254ae4..0faf516 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -30,6 +30,7 @@
 
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Abstract class that users should subclass to use their own text based vertex
@@ -45,7 +46,7 @@
  *            Message value
  */
 @SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
     /** Uses the TextInputFormat to do everything */
     protected TextInputFormat textInputFormat = new TextInputFormat();
@@ -62,7 +63,7 @@
      * @param <E>
      *            Edge value
      */
-    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
             implements VertexReader<I, V, E, M> {
         /** Internal line record reader */
         private final RecordReader<LongWritable, Text> lineRecordReader;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 4cddaf0..dae7818 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -198,7 +198,7 @@
      * 
      * @param updateHeavyFlag
      */
-    final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+    final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
         getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
     }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 7a9e5d5..1683541 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,9 +141,6 @@
         used = 0;
         this.clear();
         int numValues = in.readInt(); // read number of values
-        if (numValues > 100) {
-            System.out.println("num values: " + numValues);
-        }
         for (int i = 0; i < numValues; i++) {
             M value = allocateValue();
             value.readFields(in); // read a value
@@ -153,9 +150,6 @@
 
     public void write(DataOutput out) throws IOException {
         int numValues = size();
-        if (numValues > 100) {
-            System.out.println("write num values: " + numValues);
-        }
         out.writeInt(numValues); // write number of values
         for (int i = 0; i < numValues; i++) {
             get(i).write(out);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 03c37dc..ff4ee91 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -49,7 +50,7 @@
      * @return User's vertex input format class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
             Configuration conf) {
         return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
                 null, VertexInputFormat.class);
@@ -63,7 +64,7 @@
      * @return Instantiated user vertex input format class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
             Configuration conf) {
         Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
         VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
@@ -106,7 +107,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
             Configuration conf) {
         return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
                 DefaultMessageCombiner.class, MessageCombiner.class);
@@ -120,7 +121,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
             Configuration conf) {
         return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
                 GlobalCountAggregator.class, GlobalAggregator.class);
@@ -138,7 +139,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
             Configuration conf) {
         Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
         return ReflectionUtils.newInstance(vertexCombinerClass, conf);
@@ -164,7 +165,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
             Configuration conf) {
         Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
         return ReflectionUtils.newInstance(globalAggregatorClass, conf);
@@ -178,7 +179,7 @@
      * @return User's vertex class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
             Configuration conf) {
         return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
     }
@@ -191,7 +192,7 @@
      * @return Instantiated user vertex
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Vertex<I, V, E, M> createVertex(
             Configuration conf) {
         Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
         Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
@@ -299,7 +300,7 @@
      * @return User's vertex message value class
      */
     @SuppressWarnings("unchecked")
-    public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+    public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
         if (conf == null)
             conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
@@ -369,7 +370,7 @@
      *            Configuration to check
      * @return Instantiated user vertex message value
      */
-    public static <M extends Writable> M createMessageValue(Configuration conf) {
+    public static <M extends WritableSizable> M createMessageValue(Configuration conf) {
         Class<M> messageValueClass = getMessageValueClass(conf);
         try {
             return messageValueClass.newInstance();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index d2d90a2..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -14,42 +14,82 @@
  */
 package edu.uci.ics.pregelix.api.util;
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+public class DefaultMessageCombiner<I extends WritableComparable, M extends WritableSizable> extends
         MessageCombiner<I, M, MsgList> {
     private MsgList<M> msgList;
+    private int metaSlot = 8;
+    private int accumulatedSize = metaSlot;
 
     @Override
     public void init(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        this.msgList.setSegmentStart(false);
+    }
+
+    private void realInit(MsgList providedMsgList) {
         this.msgList = providedMsgList;
         this.msgList.clearElements();
+        this.accumulatedSize = metaSlot;
     }
 
     @Override
     public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
         msgList.addElement(msg);
+        accumulatedSize += msg.sizeInBytes();
     }
 
     @Override
     public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
         msgList.addAllElements(partialAggregate);
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            accumulatedSize += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
     }
 
     @Override
     public MsgList finishPartial() {
+        msgList.setSegmentEnd(false);
         return msgList;
     }
 
     @Override
     public MsgList<M> finishFinal() {
+        msgList.setSegmentEnd(false);
         return msgList;
     }
 
+    @Override
+    public void initAll(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        msgList.setSegmentStart(true);
+    }
+
+    @Override
+    public MsgList<M> finishFinalAll() {
+        msgList.setSegmentEnd(true);
+        return msgList;
+    }
+
+    @Override
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return accumulatedSize + msg.sizeInBytes();
+    }
+
+    @Override
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, MsgList partialAggregate)
+            throws HyracksDataException {
+        int size = accumulatedSize;
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            size += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
+        return size;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
index ffc6526..9a95f09 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -21,9 +21,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings("rawtypes")
-public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
 
     private LongWritable state = new LongWritable(0);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
new file mode 100644
index 0000000..4a98167
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * @author yingyib
+ */
+public class JobStateUtils {
+
+    public static final String TMP_DIR = "/tmp/";
+
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(true);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (dfs.exists(path)) {
+                return true;
+            } else {
+                return false;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 2a3efcf..a3f24f4 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 
@@ -209,84 +209,84 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-runtime</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -300,7 +300,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks.examples</groupId>
 			<artifactId>hyracks-integration-tests</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -320,7 +320,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index db6c2c8..c144ddd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -32,8 +32,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -178,18 +178,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -383,18 +383,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3af8921..c29ea18 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -32,8 +32,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -144,9 +144,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -155,9 +155,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -198,8 +198,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -208,8 +208,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -323,9 +323,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -342,18 +343,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -394,8 +395,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -404,8 +405,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 50949aa..dc61971 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -31,8 +31,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -148,9 +148,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, false);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, false);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -190,8 +190,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -200,8 +200,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -312,9 +312,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -331,9 +332,9 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, false);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, false);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -371,8 +372,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -381,8 +382,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 362e413..34f723f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -31,8 +31,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -143,9 +143,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -161,9 +161,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -204,8 +204,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -214,8 +214,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -326,9 +326,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -345,9 +346,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -361,9 +362,9 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -404,8 +405,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -414,8 +415,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 0876893..3e01109 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -21,9 +21,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
@@ -75,11 +75,11 @@
         return rdFactory;
     }
 
-    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal,
-            boolean partialAggAsInput) {
+    public static IClusteredAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf,
+            boolean isFinal, boolean partialAggAsInput) {
         IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
                 isFinal, partialAggAsInput);
-        IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
                 new IAggregateFunctionFactory[] { aggFuncFactory });
         return aggregatorFactory;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 73b053f..e8a6b5c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.pregelix.core.util;
 
+import java.io.File;
 import java.util.EnumSet;
 
+import org.apache.commons.io.FileUtils;
+
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -46,6 +49,10 @@
     private static IHyracksClientConnection hcc;
 
     public static void init() throws Exception {
+        FileUtils.forceMkdir(new File("dev1"));
+        FileUtils.forceMkdir(new File("dev2"));
+        FileUtils.forceMkdir(new File("dev3"));
+        FileUtils.forceMkdir(new File("dev4"));
         CCConfig ccConfig = new CCConfig();
         ccConfig.clientNetIpAddress = CC_HOST;
         ccConfig.clusterNetIpAddress = CC_HOST;
@@ -80,7 +87,7 @@
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
-        ncConfig2.ioDevices="dev1,dev2";
+        ncConfig2.ioDevices="dev3,dev4";
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index 35a6c91..d4c0ee6 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
     		<groupId>edu.uci.ics.hyracks</groupId>
     		<artifactId>pregelix</artifactId>
-    		<version>0.2.7-SNAPSHOT</version>
+    		<version>0.2.10-SNAPSHOT</version>
   	</parent>
 
 
@@ -87,15 +87,15 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>pregelix-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 97db63f..c544b31 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -24,4 +24,10 @@
     public void step(IFrameTupleReference tuple) throws HyracksDataException;
 
     public void finish() throws HyracksDataException;
+
+    public void initAll() throws HyracksDataException;
+
+    public void finishAll() throws HyracksDataException;
+
+    public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException;
 }
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 58795d1..d5364da 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,11 +16,12 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public interface IAggregateFunctionFactory extends Serializable {
-	public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
-			IDataOutputProvider provider) throws HyracksException;
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+            IFrameWriter writer) throws HyracksException;
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 3604e57..9ec8e1d 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 
@@ -88,84 +88,84 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std-base</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-hdfs-core</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
new file mode 100644
index 0000000..bb41953
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ClusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+
+    private static final long serialVersionUID = 1L;
+
+    public ClusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new ClusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
new file mode 100644
index 0000000..a95a46e
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+class ClusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+    private final RecordDescriptor inRecordDescriptor;
+    private final RecordDescriptor outRecordDescriptor;
+    private ClusteredGroupWriter pgw;
+
+    ClusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor) {
+        this.ctx = ctx;
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.inRecordDescriptor = inRecordDescriptor;
+        this.outRecordDescriptor = outRecordDescriptor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        final ByteBuffer copyFrame = ctx.allocateFrame();
+        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        copyFrameAccessor.reset(copyFrame);
+        ByteBuffer outFrame = ctx.allocateFrame();
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+        pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
+                outRecordDescriptor, writer);
+        pgw.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        pgw.nextFrame(buffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        pgw.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        pgw.close();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
new file mode 100644
index 0000000..4b4a1c3
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class ClusteredGroupWriter implements IFrameWriter {
+    private final int[] groupFields;
+    private final IBinaryComparator[] comparators;
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
+    private final IFrameWriter writer;
+    private final ByteBuffer copyFrame;
+    private final FrameTupleAccessor inFrameAccessor;
+    private final FrameTupleAccessor copyFrameAccessor;
+
+    private final ByteBuffer outFrame;
+    private final FrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
+
+    private boolean first;
+
+    public ClusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IClusteredAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.writer = writer;
+        copyFrame = ctx.allocateFrame();
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
+
+        outFrame = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+
+        tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer, outFrame, appender);
+        this.aggregateState = aggregator.createAggregateStates();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        first = true;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int nTuples = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+
+                tupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+                }
+                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+                first = false;
+
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                } else {
+                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                }
+
+            }
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            writeOutput(prevTupleAccessor, prevTupleIndex);
+
+            tupleBuilder.reset();
+            for (int j = 0; j < groupFields.length; j++) {
+                tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
+            }
+            aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+        } else {
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+        }
+    }
+
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+            throws HyracksDataException {
+        tupleBuilder.reset();
+        for (int j = 0; j < groupFields.length; j++) {
+            tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
+        }
+        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize())) {
+            FrameUtils.flushFrame(outFrame, writer);
+            appender.reset(outFrame, true);
+            if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new HyracksDataException("The output of size " + tupleBuilder.getSize()
+                        + " cannot be fit into a frame of size " + outFrame.array().length);
+            }
+        }
+
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!first) {
+            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outFrame, writer);
+            }
+        }
+        aggregateState.close();
+        writer.close();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..3256f08
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ *
+ */
+public interface IClusteredAggregatorDescriptorFactory extends Serializable {
+
+    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            IFrameWriter resultWriter, ByteBuffer outputFrame, FrameTupleAppender appender) throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 5156dbf..7221cb5 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -157,16 +157,34 @@
             ITupleReference tupleRef = cursor.getTuple();
 
             /**
+             * merge with updated tuple
+             */
+            ITupleReference indexEntryTuple = tupleRef;
+            ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+            if (cachedUpdatedLastTuple != null) {
+                if (compare(cachedUpdatedLastTuple, tupleRef) == 0) {
+                    indexEntryTuple = cachedUpdatedLastTuple;
+                }
+            }
+
+            /**
              * call the update function
              */
-            functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+            functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
 
-            //doing copy update
-            CopyUpdateUtil.copyUpdate(tempTupleReference, tupleRef, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                    rangePred);
+            /**
+             * doing copy update
+             */
+            CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+                    cursor, rangePred);
         }
     }
 
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 4ca7533..b21cd2a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -195,8 +195,10 @@
                 // TODO: currently use low key only, check what they mean
                 int cmp = compare(lowKey, currentTopTuple);
                 if (cmp <= 0) {
-                    if (cmp == 0)
+                    if (cmp == 0) {
                         outputMatch(i);
+                        currentTopTuple = cursor.getTuple();
+                    }
                     i++;
                 } else {
                     moveTreeCursor();
@@ -262,16 +264,28 @@
     }
 
     //for the join match casesos
-    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference indexTuple)
             throws Exception {
         /**
+         * merge with the cached tuple, if any
+         */
+        ITupleReference indexEntryTuple = indexTuple;
+        ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+        if (cachedUpdatedLastTuple != null) {
+            if (compare(cachedUpdatedLastTuple, indexTuple) == 0) {
+                indexEntryTuple = cachedUpdatedLastTuple;
+            }
+        }
+        /**
          * function call
          */
-        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+        functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
 
-        //doing clone update
-        CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+        /**
+         * doing clone update
+         */
+        CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+                cursor, rangePred);
     }
 
     /** write result for outer case */
@@ -290,4 +304,4 @@
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         writers[index] = writer;
     }
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index b2be366..ea1e02e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
@@ -41,6 +42,7 @@
     private final FrameTupleAppender appender;
     private final IHyracksTaskContext ctx;
     private final FrameTupleReference tuple = new FrameTupleReference();
+    private final FrameTupleReference lastTuple = new FrameTupleReference();
     private final int frameSize;
     private IFrameTupleAccessor fta;
 
@@ -104,6 +106,21 @@
         appender.reset(buffer, true);
     }
 
+    /**
+     * return the last updated
+     * 
+     * @throws HyracksDataException
+     */
+    public ITupleReference getLastTuple() throws HyracksDataException {
+        fta.reset(buffers.get(currentInUse));
+        int tupleIndex = fta.getTupleCount() - 1;
+        if (tupleIndex < 0) {
+            return null;
+        }
+        lastTuple.reset(fta, tupleIndex);
+        return lastTuple;
+    }
+
     private void allocate(int index) throws HyracksDataException {
         if (index >= buffers.size()) {
             buffers.add(ctx.allocateFrame());
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 2828451..1df75ae 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -7,8 +7,7 @@
 	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
 	License for the specific language governing permissions and ! limitations 
 	under the License. ! -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
@@ -17,7 +16,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 
@@ -84,75 +83,75 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std-base</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 24a0a9e..e25a46a 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -79,7 +79,7 @@
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
                 new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
                 threadFactory);
-        int numPagesInMemComponents = numPages / 4;
+        int numPagesInMemComponents = numPages / 8;
         vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(), pageSize,
                 numPagesInMemComponents));
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1d7c979..75f8ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
 import edu.uci.ics.pregelix.dataflow.context.StateKey;
 
@@ -91,22 +92,6 @@
         }
     }
 
-    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
-            Path path = new Path(pathStr);
-            if (!dfs.exists(path)) {
-                FSDataOutputStream output = dfs.create(path, true);
-                output.writeBoolean(true);
-                output.flush();
-                output.close();
-            }
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
     public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
             throws HyracksDataException {
         try {
@@ -136,19 +121,12 @@
         }
     }
 
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        JobStateUtils.writeForceTerminationState(conf, jobId);
+    }
+
     public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
-            Path path = new Path(pathStr);
-            if (dfs.exists(path)) {
-                return true;
-            } else {
-                return false;
-            }
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
+        return JobStateUtils.readForceTerminationState(conf, jobId);
     }
 
     public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-dist/pom.xml b/pregelix/pregelix-dist/pom.xml
index a868ff2..f0551a6 100644
--- a/pregelix/pregelix-dist/pom.xml
+++ b/pregelix/pregelix-dist/pom.xml
@@ -1,24 +1,19 @@
 <?xml version="1.0"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 	<artifactId>pregelix-dist</artifactId>
 	<name>pregelix-dist</name>
@@ -38,35 +33,35 @@
 				</configuration>
 			</plugin>
 			<plugin>
-                                <artifactId>maven-assembly-plugin</artifactId>
-                                <version>2.2-beta-5</version>
-                                <executions>
-                                        <execution>
-                                                <configuration>
-                                                        <descriptors>
-                                                                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
-                                                        </descriptors>
-                                                </configuration>
-                                                <phase>package</phase>
-                                                <goals>
-                                                        <goal>attached</goal>
-                                                </goals>
-                                        </execution>
-                                </executions>
-                        </plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.2-beta-5</version>
+				<executions>
+					<execution>
+						<configuration>
+							<descriptors>
+								<descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+						<phase>package</phase>
+						<goals>
+							<goal>attached</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 	<dependencies>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-core</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-example</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml b/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
index ab46338..a0fc2ab 100644
--- a/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
+++ b/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
@@ -1,17 +1,12 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
 <assembly>
 	<id>binary-assembly</id>
 	<formats>
@@ -31,25 +26,25 @@
 			<fileMode>0755</fileMode>
 		</fileSet>
 		<fileSet>
-                        <directory>../pregelix-core/target/appassembler/lib</directory>
-                        <outputDirectory>lib</outputDirectory>
-                         <includes>
-                                <include>*.jar</include>
-                        </includes>
-                        <fileMode>0755</fileMode>
-                </fileSet>
-		<fileSet>
-                        <directory>../pregelix-example/target</directory>
-                        <outputDirectory>examples</outputDirectory>
-                         <includes>
-        			<include>*with-dependencies.jar</include>
-      			</includes>
+			<directory>../pregelix-core/target/appassembler/lib</directory>
+			<outputDirectory>lib</outputDirectory>
+			<includes>
+				<include>*.jar</include>
+			</includes>
 			<fileMode>0755</fileMode>
-                </fileSet>
+		</fileSet>
 		<fileSet>
-                        <directory>../pregelix-example/data</directory>
-                        <outputDirectory>data</outputDirectory>
-                        <fileMode>0755</fileMode>
-                </fileSet>
+			<directory>../pregelix-example/target</directory>
+			<outputDirectory>examples</outputDirectory>
+			<includes>
+				<include>*with-dependencies.jar</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../pregelix-example/data</directory>
+			<outputDirectory>data</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
 	</fileSets>
 </assembly>
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh b/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh
new file mode 100644
index 0000000..7767b2d
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh
@@ -0,0 +1,7 @@
+. conf/cluster.properties
+
+NODEID=`hostname | cut -d '.' -f 1`
+#echo $NODEID
+
+#echo "rsync ${NCLOGS_DIR}/${NODEID}.log ${1}:${2}"
+rsync ${NCLOGS_DIR}/${NODEID}.log ${1}:${2}
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh b/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh
new file mode 100644
index 0000000..e7d45e8
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh
@@ -0,0 +1,12 @@
+. conf/cluster.properties
+PREGELIX_PATH=`pwd`
+LOG_PATH=$PREGELIX_PATH/logs/
+rm -rf $LOG_PATH
+mkdir $LOG_PATH
+ccname=`hostname`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${PREGELIX_PATH}; bin/dumptrace.sh; bin/copylog.sh ${ccname} ${LOG_PATH}"
+done
+
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh b/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh
new file mode 100644
index 0000000..9fe55f0
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh
@@ -0,0 +1,15 @@
+echo `hostname`
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
+  USERID=`id | sed 's/^uid=//;s/(.*$//'`
+  PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
+fi
+
+echo $PID
+kill -QUIT $PID
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index c2538b1..1066e3b 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 	<build>
@@ -107,6 +107,7 @@
 								<include>expect*</include>
 								<include>ClusterController*</include>
 								<include>edu.uci.*</include>
+								<include>dev*</include>
 							</includes>
 						</fileset>
 					</filesets>
@@ -119,7 +120,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-core</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
new file mode 100644
index 0000000..e369d29
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class EarlyTerminationVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+    private VLongWritable tempValue = new VLongWritable();
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            if (getVertexId().get() % 4 == 2) {
+                terminatePartition();
+            } else {
+                tempValue.set(1);
+                setVertexValue(tempValue);
+            }
+        }
+        if (getSuperstep() == 2) {
+            if (getVertexId().get() % 4 == 3) {
+                terminatePartition();
+            } else {
+                tempValue.set(2);
+                setVertexValue(tempValue);
+                voteToHalt();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleEarlyTerminattionVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleEarlyTerminattionVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleEarlyTerminattionVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleEarlyTerminattionVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(EarlyTerminationVertex.class.getSimpleName());
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 7cf8408..7fae776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -18,7 +18,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -32,6 +31,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
new file mode 100644
index 0000000..6c3c752
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowFixedsizeVertex extends Vertex<VLongWritable, LongWritable, VLongWritable, LongWritable> {
+
+    private LongWritable outputMsg = new LongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private LongWritable tmpVertexValue = new LongWritable(0);
+    private int numOfMsgClones = 10000;
+
+    @Override
+    public void compute(Iterator<LongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            long numOfMsg = getVertexValue().get();
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numOfMsg++;
+            }
+            tmpVertexValue.set(numOfMsg);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, LongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, LongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, LongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, LongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(MessageOverflowFixedsizeVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
new file mode 100644
index 0000000..d0221bf
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+    private VLongWritable outputMsg = new VLongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private VLongWritable tmpVertexValue = new VLongWritable(0);
+    private int numOfMsgClones = 10000;
+    private int numIncomingMsgs = 0;
+
+    @Override
+    public void open() {
+        if (getSuperstep() == 2) {
+            numIncomingMsgs = 0;
+        }
+    }
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numIncomingMsgs++;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (getSuperstep() == 2) {
+            tmpVertexValue.set(numIncomingMsgs);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(MessageOverflowVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 8664667..a866c1c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -21,7 +21,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,6 +44,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 6a42636..1bb33b8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -42,6 +41,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
@@ -116,7 +116,7 @@
     }
 
     @Override
-    public void compute(Iterator<ByteWritable> msgIterator) {
+    public void compute(Iterator<ByteWritable> msgIterator) throws Exception {
         if (sourceId < 0) {
             sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
         }
@@ -171,13 +171,20 @@
         return getVertexId() + " " + getVertexValue();
     }
 
-    private void signalTerminate() {
-        Configuration conf = getContext().getConfiguration();
-        try {
-            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
-            writeReachibilityResult(conf, true);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+    private void signalTerminate() throws Exception {
+        writeReachibilityResult(getContext().getConfiguration(), true);
+        terminateJob();
+    }
+
+    private void writeReachibilityResult(Configuration conf, boolean terminate) throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+        Path path = new Path(pathStr);
+        if (!dfs.exists(path)) {
+            FSDataOutputStream output = dfs.create(path, true);
+            output.writeBoolean(terminate);
+            output.flush();
+            output.close();
         }
     }
 
@@ -187,22 +194,6 @@
         }
     }
 
-    private void writeReachibilityResult(Configuration conf, boolean terminate) {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
-            Path path = new Path(pathStr);
-            if (!dfs.exists(path)) {
-                FSDataOutputStream output = dfs.create(path, true);
-                output.writeBoolean(terminate);
-                output.flush();
-                output.close();
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
     private static boolean readReachibilityResult(Configuration conf) {
         try {
             FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 41c26b1..648f168 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -19,7 +19,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -32,6 +31,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
@@ -127,7 +127,7 @@
         }
         voteToHalt();
     }
-    
+
     @Override
     public String toString() {
         return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 6cb1f4a..f60387a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -82,7 +82,7 @@
         String[] inputs = options.inputPaths.split(";");
         FileInputFormat.setInputPaths(job, inputs[0]);
         for (int i = 1; i < inputs.length; i++)
-            FileInputFormat.addInputPaths(job, inputs[0]);
+            FileInputFormat.addInputPaths(job, inputs[i]);
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
         job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index f46d9c3..67681d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 public class TextPageRankInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
index 013a063..3ea4a9f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 public class TextShortestPathsInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
new file mode 100644
index 0000000..c943288
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Boolean values.
+ */
+public class BooleanWritable extends org.apache.hadoop.io.BooleanWritable implements WritableSizable {
+
+    public BooleanWritable(boolean value) {
+        super(value);
+    }
+
+    public BooleanWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
new file mode 100644
index 0000000..2a1fd22
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Byte values.
+ */
+public class ByteWritable extends org.apache.hadoop.io.ByteWritable implements WritableSizable {
+
+    public ByteWritable(byte value) {
+        super(value);
+    }
+
+    public ByteWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
new file mode 100644
index 0000000..04a5549
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Bytes values.
+ */
+public class BytesWritable extends org.apache.hadoop.io.BytesWritable implements WritableSizable {
+
+    public BytesWritable(byte[] value) {
+        super(value);
+    }
+
+    public BytesWritable() {
+        super();
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return getLength() + 4; // add the integer size slot
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
new file mode 100644
index 0000000..ebc7fe4
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Double values.
+ */
+public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable {
+
+    public DoubleWritable(double value) {
+        super(value);
+    }
+
+    public DoubleWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
new file mode 100644
index 0000000..6772b0a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for floats. */
+public class FloatWritable extends org.apache.hadoop.io.FloatWritable implements WritableSizable {
+
+    public FloatWritable(float value) {
+        super(value);
+    }
+
+    public FloatWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
new file mode 100644
index 0000000..4944232
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for ints. */
+public class IntWritable extends org.apache.hadoop.io.IntWritable implements WritableSizable {
+
+    public IntWritable(int value) {
+        super(value);
+    }
+
+    public IntWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
new file mode 100644
index 0000000..3ecab79
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for longs. */
+public class LongWritable extends org.apache.hadoop.io.LongWritable implements WritableSizable {
+
+    public LongWritable(long value) {
+        super(value);
+    }
+
+    public LongWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
new file mode 100644
index 0000000..a2f184a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** Singleton Writable with no data. */
+@SuppressWarnings("rawtypes")
+public class NullWritable implements WritableComparable, WritableSizable {
+
+    private static final NullWritable THIS = new NullWritable();
+
+    private NullWritable() {
+    } // no public ctor
+
+    /** Returns the single instance of this class. */
+    public static NullWritable get() {
+        return THIS;
+    }
+
+    public String toString() {
+        return "(null)";
+    }
+
+    public int sizeInBytes() {
+        return 0;
+    }
+
+    public int hashCode() {
+        return 0;
+    }
+
+    public int compareTo(Object other) {
+        if (!(other instanceof NullWritable)) {
+            throw new ClassCastException("can't compare " + other.getClass().getName() + " to NullWritable");
+        }
+        return 0;
+    }
+
+    public boolean equals(Object other) {
+        return other instanceof NullWritable;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    public void write(DataOutput out) throws IOException {
+    }
+
+    /** A Comparator &quot;optimized&quot; for NullWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(NullWritable.class);
+        }
+
+        /**
+         * Compare the buffers in serialized form.
+         */
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            assert 0 == l1;
+            assert 0 == l2;
+            return 0;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(NullWritable.class, new Comparator());
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
new file mode 100644
index 0000000..94df74f
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A WritableComparable for integer values stored in variable-length format.
+ * Such values take between one and five bytes. Smaller values take fewer bytes.
+ * 
+ * @see org.apache.hadoop.io.WritableUtils#readVInt(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VIntWritable implements WritableComparable, WritableSizable {
+    private int value;
+
+    public VIntWritable() {
+    }
+
+    public VIntWritable(int value) {
+        set(value);
+    }
+
+    public int sizeInBytes() {
+        return 5;
+    }
+
+    /** Set the value of this VIntWritable. */
+    public void set(int value) {
+        this.value = value;
+    }
+
+    /** Return the value of this VIntWritable. */
+    public int get() {
+        return value;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        value = WritableUtils.readVInt(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, value);
+    }
+
+    /** Returns true iff <code>o</code> is a VIntWritable with the same value. */
+    public boolean equals(Object o) {
+        if (!(o instanceof VIntWritable))
+            return false;
+        VIntWritable other = (VIntWritable) o;
+        return this.value == other.value;
+    }
+
+    public int hashCode() {
+        return value;
+    }
+
+    /** Compares two VIntWritables. */
+    public int compareTo(Object o) {
+        int thisValue = this.value;
+        int thatValue = ((VIntWritable) o).value;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    public String toString() {
+        return Integer.toString(value);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index e12d930..ec1109f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -22,16 +22,26 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
  * A WritableComparable for longs in a variable-length format. Such values take
- * between one and five bytes. Smaller values take fewer bytes.
+ * between one and nine bytes. Smaller values take fewer bytes.
  * 
  * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
  */
 @SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable {
+public class VLongWritable implements WritableComparable, WritableSizable {
+    private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
+    private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
+    private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
+    private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
+    private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
+    private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
+    private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
+    private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
+
     private long value;
 
     public VLongWritable() {
@@ -41,6 +51,28 @@
         set(value);
     }
 
+    public int sizeInBytes() {
+        if (value >= 0 && value <= ONE_BYTE_MAX) {
+            return 1;
+        } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
+            return 2;
+        } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
+            return 3;
+        } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
+            return 4;
+        } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
+            return 5;
+        } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
+            return 6;
+        } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
+            return 7;
+        } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
+            return 8;
+        } else {
+            return 9;
+        }
+    }
+
     /** Set the value of this LongWritable. */
     public void set(long value) {
         this.value = value;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
index 0a58c00..dd86a45 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -23,14 +23,13 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.io.Writable;
-
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
  * The adjacency list contains <src, list-of-neighbors>
  */
-public class AdjacencyListWritable implements Writable {
+public class AdjacencyListWritable implements WritableSizable {
 
     private VLongWritable sourceVertex = new VLongWritable();
     private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
@@ -96,4 +95,13 @@
         return destinationVertexes.contains(v);
     }
 
+    @Override
+    public int sizeInBytes() {
+        int size = 4; // the size of list bytes
+        for (VLongWritable dest : destinationVertexes) {
+            size += dest.sizeInBytes();
+        }
+        return size;
+    }
+
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 15117a1..13cec61 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -26,8 +26,13 @@
 import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex.SimpleEarlyTerminattionVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
 import edu.uci.ics.pregelix.example.GraphMutationVertex.SimpleGraphMutationVertexOutputFormat;
+import edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex.SimpleMessageOverflowVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -233,7 +238,7 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -247,7 +252,7 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -262,7 +267,7 @@
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
         job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -280,6 +285,59 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generateMessageOverflowFixedsizeJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(MessageOverflowFixedsizeVertex.SimpleMessageOverflowVertexOutputFormat.class);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJobLSM(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        job.setLSMStorage(true);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateEarlyTerminationJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void genPageRank() throws IOException {
         generatePageRankJob("PageRank", outputBase + "PageRank.xml");
         generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
@@ -319,6 +377,16 @@
         generateGraphMutationJob("Graph Mutation", outputBase + "GraphMutation.xml");
     }
 
+    private static void genMessageOverflow() throws IOException {
+        generateMessageOverflowJob("Message Overflow", outputBase + "MessageOverflow.xml");
+        generateMessageOverflowJobLSM("Message Overflow LSM", outputBase + "MessageOverflowLSM.xml");
+        generateMessageOverflowFixedsizeJob("Message Overflow Fixedsize", outputBase + "MessageOverflowFixedsize.xml");
+    }
+
+    private static void genEarlyTermination() throws IOException {
+        generateEarlyTerminationJob("Early Termination", outputBase + "EarlyTermination.xml");
+    }
+
     public static void main(String[] args) throws IOException {
         genPageRank();
         genShortestPath();
@@ -327,5 +395,7 @@
         genTriangleCounting();
         genMaximalClique();
         genGraphMutation();
+        genMessageOverflow();
+        genEarlyTermination();
     }
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
new file mode 100644
index 0000000..196b114
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.IntWritable;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VIntWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * @author yingyib
+ */
+public class SizeEstimationTest {
+
+    @Test
+    public void testVLong() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testLong() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new LongWritable(rand.nextLong()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testBoolean() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new BooleanWritable(rand.nextBoolean()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testByte() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new ByteWritable((byte) rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testDouble() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new DoubleWritable(rand.nextDouble()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testFloat() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new DoubleWritable(rand.nextFloat()));
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testNull() throws Exception {
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(NullWritable.get());
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testVInt() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new VIntWritable(rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testInt() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new IntWritable(rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    private void verifySizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput dos = new DataOutputStream(bos);
+        int accumulatedSize = 5;
+        for (int i = 0; i < msgList.size(); i++) {
+            bos.reset();
+            WritableSizable value = msgList.get(i);
+            value.write(dos);
+            if (value.sizeInBytes() < bos.size()) {
+                throw new Exception(value + " estimated size (" + value.sizeInBytes()
+                        + ") is smaller than the actual size" + bos.size());
+            }
+            accumulatedSize += value.sizeInBytes();
+        }
+        bos.reset();
+        msgList.write(dos);
+        if (accumulatedSize < bos.size()) {
+            throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+                    + bos.size());
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
new file mode 100644
index 0000000..60a55af
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
@@ -0,0 +1,5 @@
+0	2
+4	2
+8	2
+12	2
+16	2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
new file mode 100644
index 0000000..32ee93f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
@@ -0,0 +1,5 @@
+1	2
+5	2
+9	2
+13	2
+17	2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
new file mode 100644
index 0000000..542ccae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
@@ -0,0 +1,5 @@
+2	0
+6	0
+10	0
+14	0
+18	0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
new file mode 100644
index 0000000..ff0e5b8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
@@ -0,0 +1,5 @@
+3	1
+7	1
+11	1
+15	1
+19	1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
new file mode 100644
index 0000000..d908da8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Early Termination</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex$SimpleEarlyTerminattionVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
new file mode 100644
index 0000000..8316c64
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
new file mode 100644
index 0000000..a894ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow Fixedsize</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
new file mode 100644
index 0000000..a9f8925
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 54e2256..6564eb0 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
     		<groupId>edu.uci.ics.hyracks</groupId>
     		<artifactId>pregelix</artifactId>
-    		<version>0.2.7-SNAPSHOT</version>
+    		<version>0.2.10-SNAPSHOT</version>
   	</parent>
 
 
@@ -88,89 +88,89 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..d46457c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -168,11 +168,16 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[3];
+
+                if (vertex.isPartitionTerminated()) {
+                    vertex.voteToHalt();
+                    return;
+                }
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
 
-                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+                MsgList msgContentList = (MsgList) tuple[1];
                 msgContentList.reset(msgIterator);
 
                 if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,9 +188,15 @@
                 }
 
                 try {
+                    if (msgContentList.segmentStart()) {
+                        vertex.open();
+                    }
                     vertex.compute(msgIterator);
+                    if (msgContentList.segmentEnd()) {
+                        vertex.close();
+                    }
                     vertex.finishCompute();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
 
@@ -194,7 +205,6 @@
                  */
                 if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
                     terminate = false;
-
                 aggregator.step(vertex);
             }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..eba75c9 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -172,6 +172,10 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[1];
+                if (vertex.isPartitionTerminated()) {
+                    vertex.voteToHalt();
+                    return;
+                }
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
@@ -184,12 +188,13 @@
                 }
 
                 try {
+                    vertex.open();
                     vertex.compute(msgIterator);
+                    vertex.close();
                     vertex.finishCompute();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
-
                 /**
                  * this partition should not terminate
                  */
@@ -200,6 +205,7 @@
                  * call the global aggregator
                  */
                 aggregator.step(vertex);
+
             }
 
             @Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 77f28e4..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -14,22 +14,27 @@
  */
 package edu.uci.ics.pregelix.runtime.simpleagg;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
 
-public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateFunctionFactory[] aggFactories;
@@ -41,52 +46,56 @@
     @SuppressWarnings("unchecked")
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] groupFields, int[] partialgroupFields,
+            final IFrameWriter writer, final ByteBuffer outputFrame, final FrameTupleAppender appender)
+            throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
+        final ArrayTupleBuilder internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         return new IAggregatorDescriptor() {
-
             private FrameTupleReference ftr = new FrameTupleReference();
+            private int groupKeySize = 0;
+            private int metaSlotSize = 4;
+
+            @Override
+            public AggregateState createAggregateStates() {
+                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i] = new ArrayBackedValueStorage();
+                    try {
+                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i], writer);
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+                return new AggregateState(Pair.of(aggOutput, agg));
+            }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
-                IAggregateFunction[] agg = aggState.getRight();
-
-                // initialize aggregate functions
-                for (int i = 0; i < agg.length; i++) {
-                    aggOutput[i].reset();
-                    try {
-                        agg[i].init();
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
+                setGroupKeySize(accessor, tIndex);
+                initAggregateFunctions(state, true);
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    throw new HyracksDataException(
+                            "Message combiner intermediate data size "
+                                    + stateSize
+                                    + " is larger than frame size! Check the size estimattion implementation in the message combiner.");
                 }
-
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
+                singleStep(accessor, tIndex, state);
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                IAggregateFunction[] agg = aggState.getRight();
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    emitResultTuple(accessor, tIndex, state);
+                    initAggregateFunctions(state, false);
                 }
+                singleStep(accessor, tIndex, state);
             }
 
             @Override
@@ -97,7 +106,7 @@
                 IAggregateFunction[] agg = aggState.getRight();
                 for (int i = 0; i < agg.length; i++) {
                     try {
-                        agg[i].finish();
+                        agg[i].finishAll();
                         tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
                                 aggOutput[i].getLength());
                     } catch (Exception e) {
@@ -107,21 +116,6 @@
             }
 
             @Override
-            public AggregateState createAggregateStates() {
-                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
-                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
-                for (int i = 0; i < agg.length; i++) {
-                    aggOutput[i] = new ArrayBackedValueStorage();
-                    try {
-                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
-                    } catch (Exception e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-                return new AggregateState(Pair.of(aggOutput, agg));
-            }
-
-            @Override
             public void reset() {
 
             }
@@ -137,6 +131,97 @@
 
             }
 
+            private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+
+                /**
+                 * initialize aggregate functions
+                 */
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i].reset();
+                    try {
+                        if (all) {
+                            agg[i].initAll();
+                        } else {
+                            agg[i].init();
+                        }
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private void singleStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private int estimateStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                int size = metaSlotSize + groupKeySize;
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        size += agg[i].estimateStep(ftr) + metaSlotSize;
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                return size;
+            }
+
+            private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                internalTupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
+                }
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, writer);
+                    appender.reset(outputFrame, true);
+                    if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                            internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                        throw new HyracksDataException("The output cannot be fit into a frame.");
+                    }
+                }
+            }
+
+            public void setGroupKeySize(IFrameTupleAccessor accessor, int tIndex) {
+                groupKeySize = 0;
+                for (int i = 0; i < groupFields.length; i++) {
+                    int fIndex = groupFields[i];
+                    int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+                    int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+                    groupKeySize += fLen + metaSlotSize;
+                }
+            }
+
         };
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 8090dff..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -54,10 +56,11 @@
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
 
-    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
-            boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
+            IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
+            throws HyracksDataException {
         this.conf = confFactory.createConfiguration(ctx);
-        this.output = output;
+        this.output = tmpOutput;
         this.isFinalStage = isFinalStage;
         this.partialAggAsInput = partialAggAsInput;
         msgList.setConf(this.conf);
@@ -68,6 +71,12 @@
     }
 
     @Override
+    public void initAll() throws HyracksDataException {
+        keyRead = false;
+        combiner.initAll(msgList);
+    }
+
+    @Override
     public void init() throws HyracksDataException {
         keyRead = false;
         combiner.init(msgList);
@@ -75,6 +84,43 @@
 
     @Override
     public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        if (!partialAggAsInput) {
+            combiner.stepPartial(key, (WritableSizable) value);
+        } else {
+            combiner.stepFinal(key, value);
+        }
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinal();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void finishAll() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinalAll();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
         FrameTupleReference ftr = (FrameTupleReference) tuple;
         IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
         ByteBuffer buffer = fta.getBuffer();
@@ -94,28 +140,13 @@
             }
             value.readFields(valueInput);
             if (!partialAggAsInput) {
-                combiner.stepPartial(key, value);
+                return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
             } else {
-                combiner.stepFinal(key, value);
+                return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-
-    }
-
-    @Override
-    public void finish() throws HyracksDataException {
-        try {
-            if (!isFinalStage) {
-                combinedResult = combiner.finishPartial();
-            } else {
-                combinedResult = combiner.finishFinal();
-            }
-            combinedResult.write(output);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
     }
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index 33dfa5d..54eccf5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
@@ -37,9 +38,9 @@
     }
 
     @Override
-    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
-            throws HyracksException {
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+            IFrameWriter writer) throws HyracksException {
         DataOutput output = provider.getDataOutput();
-        return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
+        return new AggregationFunction(ctx, confFactory, output, writer, isFinalStage, partialAggAsInput);
     }
 }