add message overflow support
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 08c7151..5ea6413 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
@@ -39,7 +40,7 @@
  */
 
 @SuppressWarnings("rawtypes")
-public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> {
     /**
      * initialize aggregator
      */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index f5daf99..eb11c3a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -19,6 +19,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for combining of messages that are sent to the same vertex.
@@ -36,7 +37,7 @@
  *        the type of the partially combined messages
  */
 @SuppressWarnings("rawtypes")
-public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends WritableSizable, P extends Writable> {
 
     /**
      * initialize combiner
@@ -82,4 +83,18 @@
      * @return the final message List
      */
     public abstract MsgList<M> finishFinal();
+
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return 0;
+    }
+    
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, P partialAggregate) throws HyracksDataException {
+        return 0;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 104f396..2209200 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,8 +15,7 @@
 
 package edu.uci.ics.pregelix.api.graph;
 
-import org.apache.hadoop.io.Writable;
-
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 
@@ -27,7 +26,7 @@
  * @param <M>
  *            message type
  */
-public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
     private static final long serialVersionUID = 1L;
 
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 4175078..9cdd8d0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
@@ -48,7 +49,7 @@
  *            Message value type
  */
 @SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements Writable {
     private static long superstep = 0;
     /** Class-wide number of vertices */
@@ -569,4 +570,15 @@
         Vertex.context = context;
     }
 
+    @Override
+    public int hashCode() {
+        return vertexId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        Vertex vertex = (Vertex) object;
+        return vertexId.equals(vertex.getVertexId());
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
new file mode 100644
index 0000000..568500b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+/**
+ * @author yingyib
+ */
+public interface Sizable {
+
+    public int sizeInBytes();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
index c841b1a..73af190 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -40,7 +40,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Logically split the vertices for a graph processing application.
      * Each {@link InputSplit} is then assigned to a worker for processing.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
index e6c62ba..ba8b561 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -39,7 +39,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Use the input split and context t o setup reading the vertices.
      * Guaranteed to be called prior to any other function.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
new file mode 100644
index 0000000..ee13f76
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * @author yingyib
+ */
+public interface WritableSizable extends Writable, Sizable {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
index 985bcff..1d3c427 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -26,13 +26,14 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This VertexInputFormat is meant for testing/debugging. It simply generates
  * some vertex data that can be consumed by test applications.
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
 
     @Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
index 92c8728..376d45d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -25,6 +25,7 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Used by GeneratedVertexInputFormat to read some generated data
@@ -37,7 +38,7 @@
  *            Edge value
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements VertexReader<I, V, E, M> {
     /** Records read so far */
     protected long recordsRead = 0;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
index 2254ae4..0faf516 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -30,6 +30,7 @@
 
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Abstract class that users should subclass to use their own text based vertex
@@ -45,7 +46,7 @@
  *            Message value
  */
 @SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
     /** Uses the TextInputFormat to do everything */
     protected TextInputFormat textInputFormat = new TextInputFormat();
@@ -62,7 +63,7 @@
      * @param <E>
      *            Edge value
      */
-    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
             implements VertexReader<I, V, E, M> {
         /** Internal line record reader */
         private final RecordReader<LongWritable, Text> lineRecordReader;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 7a9e5d5..1683541 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,9 +141,6 @@
         used = 0;
         this.clear();
         int numValues = in.readInt(); // read number of values
-        if (numValues > 100) {
-            System.out.println("num values: " + numValues);
-        }
         for (int i = 0; i < numValues; i++) {
             M value = allocateValue();
             value.readFields(in); // read a value
@@ -153,9 +150,6 @@
 
     public void write(DataOutput out) throws IOException {
         int numValues = size();
-        if (numValues > 100) {
-            System.out.println("write num values: " + numValues);
-        }
         out.writeInt(numValues); // write number of values
         for (int i = 0; i < numValues; i++) {
             get(i).write(out);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 03c37dc..8964eb6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -49,7 +50,7 @@
      * @return User's vertex input format class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
             Configuration conf) {
         return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
                 null, VertexInputFormat.class);
@@ -63,7 +64,7 @@
      * @return Instantiated user vertex input format class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
             Configuration conf) {
         Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
         VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
@@ -106,7 +107,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
             Configuration conf) {
         return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
                 DefaultMessageCombiner.class, MessageCombiner.class);
@@ -120,7 +121,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
             Configuration conf) {
         return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
                 GlobalCountAggregator.class, GlobalAggregator.class);
@@ -138,7 +139,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
             Configuration conf) {
         Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
         return ReflectionUtils.newInstance(vertexCombinerClass, conf);
@@ -164,7 +165,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
             Configuration conf) {
         Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
         return ReflectionUtils.newInstance(globalAggregatorClass, conf);
@@ -178,7 +179,7 @@
      * @return User's vertex class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
             Configuration conf) {
         return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
     }
@@ -191,7 +192,7 @@
      * @return Instantiated user vertex
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Vertex<I, V, E, M> createVertex(
             Configuration conf) {
         Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
         Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
@@ -369,7 +370,7 @@
      *            Configuration to check
      * @return Instantiated user vertex message value
      */
-    public static <M extends Writable> M createMessageValue(Configuration conf) {
+    public static <M extends WritableSizable> M createMessageValue(Configuration conf) {
         Class<M> messageValueClass = getMessageValueClass(conf);
         try {
             return messageValueClass.newInstance();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index d2d90a2..50663b3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -14,32 +14,39 @@
  */
 package edu.uci.ics.pregelix.api.util;
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+public class DefaultMessageCombiner<I extends WritableComparable, M extends WritableSizable> extends
         MessageCombiner<I, M, MsgList> {
     private MsgList<M> msgList;
+    private int metaSlot = 8;
+    private int accumulatedSize = metaSlot;
 
     @Override
     public void init(MsgList providedMsgList) {
         this.msgList = providedMsgList;
         this.msgList.clearElements();
+        this.accumulatedSize = metaSlot;
     }
 
     @Override
     public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
         msgList.addElement(msg);
+        accumulatedSize += msg.sizeInBytes();
     }
 
     @Override
     public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
         msgList.addAllElements(partialAggregate);
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            accumulatedSize += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
     }
 
     @Override
@@ -52,4 +59,18 @@
         return msgList;
     }
 
+    @Override
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return accumulatedSize + msg.sizeInBytes();
+    }
+
+    @Override
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, MsgList partialAggregate)
+            throws HyracksDataException {
+        int size = accumulatedSize;
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            size += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
+        return size;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
index ffc6526..9a95f09 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -21,9 +21,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings("rawtypes")
-public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
 
     private LongWritable state = new LongWritable(0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index db6c2c8..c144ddd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -32,8 +32,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -178,18 +178,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -383,18 +383,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3af8921..c29ea18 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -32,8 +32,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -144,9 +144,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -155,9 +155,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -198,8 +198,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -208,8 +208,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -323,9 +323,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -342,18 +343,18 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -394,8 +395,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -404,8 +405,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 50949aa..dc61971 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -31,8 +31,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -148,9 +148,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, false);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, false);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -190,8 +190,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -200,8 +200,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -312,9 +312,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -331,9 +332,9 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, false);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, false);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -371,8 +372,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -381,8 +382,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 362e413..34f723f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -31,8 +31,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -143,9 +143,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -161,9 +161,9 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -204,8 +204,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -214,8 +214,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -326,9 +326,10 @@
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
-                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
-                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -345,9 +346,9 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
-                false);
-        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                false, false);
+        ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
 
@@ -361,9 +362,9 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
-                true, true);
-        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+        IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+                conf, true, true);
+        ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
 
@@ -404,8 +405,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -414,8 +415,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 0876893..3e01109 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -21,9 +21,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
@@ -75,11 +75,11 @@
         return rdFactory;
     }
 
-    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal,
-            boolean partialAggAsInput) {
+    public static IClusteredAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf,
+            boolean isFinal, boolean partialAggAsInput) {
         IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
                 isFinal, partialAggAsInput);
-        IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+        IClusteredAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
                 new IAggregateFunctionFactory[] { aggFuncFactory });
         return aggregatorFactory;
     }
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 97db63f..4a0a4e0 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -24,4 +24,6 @@
     public void step(IFrameTupleReference tuple) throws HyracksDataException;
 
     public void finish() throws HyracksDataException;
+
+    public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException;
 }
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 58795d1..d5364da 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,11 +16,12 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public interface IAggregateFunctionFactory extends Serializable {
-	public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
-			IDataOutputProvider provider) throws HyracksException;
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+            IFrameWriter writer) throws HyracksException;
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
new file mode 100644
index 0000000..bb41953
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ClusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+
+    private static final long serialVersionUID = 1L;
+
+    public ClusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new ClusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
new file mode 100644
index 0000000..a95a46e
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+class ClusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+    private final RecordDescriptor inRecordDescriptor;
+    private final RecordDescriptor outRecordDescriptor;
+    private ClusteredGroupWriter pgw;
+
+    ClusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor) {
+        this.ctx = ctx;
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.inRecordDescriptor = inRecordDescriptor;
+        this.outRecordDescriptor = outRecordDescriptor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        final ByteBuffer copyFrame = ctx.allocateFrame();
+        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        copyFrameAccessor.reset(copyFrame);
+        ByteBuffer outFrame = ctx.allocateFrame();
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+        pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
+                outRecordDescriptor, writer);
+        pgw.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        pgw.nextFrame(buffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        pgw.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        pgw.close();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
new file mode 100644
index 0000000..4b4a1c3
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class ClusteredGroupWriter implements IFrameWriter {
+    private final int[] groupFields;
+    private final IBinaryComparator[] comparators;
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
+    private final IFrameWriter writer;
+    private final ByteBuffer copyFrame;
+    private final FrameTupleAccessor inFrameAccessor;
+    private final FrameTupleAccessor copyFrameAccessor;
+
+    private final ByteBuffer outFrame;
+    private final FrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
+
+    private boolean first;
+
+    public ClusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IClusteredAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.writer = writer;
+        copyFrame = ctx.allocateFrame();
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
+
+        outFrame = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+
+        tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer, outFrame, appender);
+        this.aggregateState = aggregator.createAggregateStates();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        first = true;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int nTuples = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+
+                tupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+                }
+                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+                first = false;
+
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                } else {
+                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                }
+
+            }
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            writeOutput(prevTupleAccessor, prevTupleIndex);
+
+            tupleBuilder.reset();
+            for (int j = 0; j < groupFields.length; j++) {
+                tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
+            }
+            aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+        } else {
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+        }
+    }
+
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+            throws HyracksDataException {
+        tupleBuilder.reset();
+        for (int j = 0; j < groupFields.length; j++) {
+            tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
+        }
+        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize())) {
+            FrameUtils.flushFrame(outFrame, writer);
+            appender.reset(outFrame, true);
+            if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new HyracksDataException("The output of size " + tupleBuilder.getSize()
+                        + " cannot be fit into a frame of size " + outFrame.array().length);
+            }
+        }
+
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!first) {
+            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outFrame, writer);
+            }
+        }
+        aggregateState.close();
+        writer.close();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..3256f08
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ *
+ */
+public interface IClusteredAggregatorDescriptorFactory extends Serializable {
+
+    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            IFrameWriter resultWriter, ByteBuffer outputFrame, FrameTupleAppender appender) throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 5156dbf..7221cb5 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -157,16 +157,34 @@
             ITupleReference tupleRef = cursor.getTuple();
 
             /**
+             * merge with updated tuple
+             */
+            ITupleReference indexEntryTuple = tupleRef;
+            ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+            if (cachedUpdatedLastTuple != null) {
+                if (compare(cachedUpdatedLastTuple, tupleRef) == 0) {
+                    indexEntryTuple = cachedUpdatedLastTuple;
+                }
+            }
+
+            /**
              * call the update function
              */
-            functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+            functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
 
-            //doing copy update
-            CopyUpdateUtil.copyUpdate(tempTupleReference, tupleRef, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                    rangePred);
+            /**
+             * doing copy update
+             */
+            CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+                    cursor, rangePred);
         }
     }
 
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 4ca7533..b21cd2a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -195,8 +195,10 @@
                 // TODO: currently use low key only, check what they mean
                 int cmp = compare(lowKey, currentTopTuple);
                 if (cmp <= 0) {
-                    if (cmp == 0)
+                    if (cmp == 0) {
                         outputMatch(i);
+                        currentTopTuple = cursor.getTuple();
+                    }
                     i++;
                 } else {
                     moveTreeCursor();
@@ -262,16 +264,28 @@
     }
 
     //for the join match casesos
-    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference indexTuple)
             throws Exception {
         /**
+         * merge with the cached tuple, if any
+         */
+        ITupleReference indexEntryTuple = indexTuple;
+        ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+        if (cachedUpdatedLastTuple != null) {
+            if (compare(cachedUpdatedLastTuple, indexTuple) == 0) {
+                indexEntryTuple = cachedUpdatedLastTuple;
+            }
+        }
+        /**
          * function call
          */
-        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+        functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
 
-        //doing clone update
-        CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+        /**
+         * doing clone update
+         */
+        CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+                cursor, rangePred);
     }
 
     /** write result for outer case */
@@ -290,4 +304,4 @@
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         writers[index] = writer;
     }
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index b2be366..ea1e02e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
@@ -41,6 +42,7 @@
     private final FrameTupleAppender appender;
     private final IHyracksTaskContext ctx;
     private final FrameTupleReference tuple = new FrameTupleReference();
+    private final FrameTupleReference lastTuple = new FrameTupleReference();
     private final int frameSize;
     private IFrameTupleAccessor fta;
 
@@ -104,6 +106,21 @@
         appender.reset(buffer, true);
     }
 
+    /**
+     * return the last updated
+     * 
+     * @throws HyracksDataException
+     */
+    public ITupleReference getLastTuple() throws HyracksDataException {
+        fta.reset(buffers.get(currentInUse));
+        int tupleIndex = fta.getTupleCount() - 1;
+        if (tupleIndex < 0) {
+            return null;
+        }
+        lastTuple.reset(fta, tupleIndex);
+        return lastTuple;
+    }
+
     private void allocate(int index) throws HyracksDataException {
         if (index >= buffers.size()) {
             buffers.add(ctx.allocateFrame());
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 07d2d57..f3ee65e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -34,9 +34,9 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * Demonstrates the basic Pregel connected components implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 7cf8408..0c949de 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -18,7 +18,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -30,9 +29,10 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * Demonstrates the basic graph vertex insert/delete implementation.
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
new file mode 100644
index 0000000..4adde7e
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.LongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowFixedsizeVertex extends Vertex<VLongWritable, LongWritable, VLongWritable, LongWritable> {
+
+    private LongWritable outputMsg = new LongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private LongWritable tmpVertexValue = new LongWritable(0);
+    private int numOfMsgClones = 250000;
+
+    @Override
+    public void compute(Iterator<LongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            long numOfMsg = getVertexValue().get();
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numOfMsg++;
+            }
+            tmpVertexValue.set(numOfMsg);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, LongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, LongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, LongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, LongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
+        job.setVertexClass(PageRankVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
new file mode 100644
index 0000000..d7248af
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+    private VLongWritable outputMsg = new VLongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private VLongWritable tmpVertexValue = new VLongWritable(0);
+    private int numOfMsgClones = 250000;
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            long numOfMsg = getVertexValue().get();
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numOfMsg++;
+            }
+            tmpVertexValue.set(numOfMsg);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
+        job.setVertexClass(PageRankVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 8664667..ab128cd 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -21,7 +21,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -43,9 +42,10 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * Demonstrates the basic Pregel PageRank implementation.
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 6a42636..58cc00b 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -40,9 +39,10 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.ByteWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * Demonstrates the basic Pregel reachibility query implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 41c26b1..117e553 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -19,7 +19,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -30,9 +29,10 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * Demonstrates the basic Pregel shortest paths implementation.
@@ -127,7 +127,7 @@
         }
         voteToHalt();
     }
-    
+
     @Override
     public String toString() {
         return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
index 4062c74..2c9ed3a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -30,7 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextConnectedComponentsInputFormat extends
         TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index f46d9c3..f0fb988 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,7 +30,8 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextPageRankInputFormat extends
         TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
index 5cf6c1c..b7da216 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -30,7 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextReachibilityVertexInputFormat extends
         TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
index 013a063..5345a33 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,7 +30,8 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextShortestPathsInputFormat extends
         TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/data/VLongNormalizedKeyComputer.java
similarity index 97%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
rename to pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/data/VLongNormalizedKeyComputer.java
index 7d824ea..2085b7e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/data/VLongNormalizedKeyComputer.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.data;
+package edu.uci.ics.pregelix.example.lib.data;
 
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BooleanWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BooleanWritable.java
new file mode 100644
index 0000000..d8e54dd
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BooleanWritable.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Boolean values.
+ */
+public class BooleanWritable extends org.apache.hadoop.io.BooleanWritable implements WritableSizable {
+
+    public BooleanWritable(boolean value) {
+        super(value);
+    }
+
+    public BooleanWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/ByteWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/ByteWritable.java
new file mode 100644
index 0000000..e7ef23b
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/ByteWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Byte values.
+ */
+public class ByteWritable extends org.apache.hadoop.io.ByteWritable implements WritableSizable {
+
+    public ByteWritable(byte value) {
+        super(value);
+    }
+
+    public ByteWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BytesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BytesWritable.java
new file mode 100644
index 0000000..465e7c0
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/BytesWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Bytes values.
+ */
+public class BytesWritable extends org.apache.hadoop.io.BytesWritable implements WritableSizable {
+
+    public BytesWritable(byte[] value) {
+        super(value);
+    }
+
+    public BytesWritable() {
+        super();
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return getLength() + 4; // add the integer size slot
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/DoubleWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/DoubleWritable.java
new file mode 100644
index 0000000..7692684
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/DoubleWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Double values.
+ */
+public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable {
+
+    public DoubleWritable(double value) {
+        super(value);
+    }
+
+    public DoubleWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/FloatWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/FloatWritable.java
new file mode 100644
index 0000000..0b067a7
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/FloatWritable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for floats. */
+public class FloatWritable extends org.apache.hadoop.io.FloatWritable implements WritableSizable {
+
+    public FloatWritable(float value) {
+        super(value);
+    }
+
+    public FloatWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/IntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/IntWritable.java
new file mode 100644
index 0000000..7b29afe
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/IntWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for ints. */
+public class IntWritable extends org.apache.hadoop.io.IntWritable implements WritableSizable {
+
+    public IntWritable(int value) {
+        super(value);
+    }
+
+    public IntWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/LongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/LongWritable.java
new file mode 100644
index 0000000..72e2bf9
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/LongWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for longs. */
+public class LongWritable extends org.apache.hadoop.io.LongWritable implements WritableSizable {
+
+    public LongWritable(long value) {
+        super(value);
+    }
+
+    public LongWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/NullWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/NullWritable.java
new file mode 100644
index 0000000..d443c1d
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/NullWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** Singleton Writable with no data. */
+@SuppressWarnings("rawtypes")
+public class NullWritable implements WritableComparable, WritableSizable {
+
+    private static final NullWritable THIS = new NullWritable();
+
+    private NullWritable() {
+    } // no public ctor
+
+    /** Returns the single instance of this class. */
+    public static NullWritable get() {
+        return THIS;
+    }
+
+    public String toString() {
+        return "(null)";
+    }
+
+    public int sizeInBytes() {
+        return 0;
+    }
+
+    public int hashCode() {
+        return 0;
+    }
+
+    public int compareTo(Object other) {
+        if (!(other instanceof NullWritable)) {
+            throw new ClassCastException("can't compare " + other.getClass().getName() + " to NullWritable");
+        }
+        return 0;
+    }
+
+    public boolean equals(Object other) {
+        return other instanceof NullWritable;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    public void write(DataOutput out) throws IOException {
+    }
+
+    /** A Comparator &quot;optimized&quot; for NullWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(NullWritable.class);
+        }
+
+        /**
+         * Compare the buffers in serialized form.
+         */
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            assert 0 == l1;
+            assert 0 == l2;
+            return 0;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(NullWritable.class, new Comparator());
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VIntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VIntWritable.java
new file mode 100644
index 0000000..9041209
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VIntWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A WritableComparable for integer values stored in variable-length format.
+ * Such values take between one and five bytes. Smaller values take fewer bytes.
+ * 
+ * @see org.apache.hadoop.io.WritableUtils#readVInt(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VIntWritable implements WritableComparable, WritableSizable {
+    private int value;
+
+    public VIntWritable() {
+    }
+
+    public VIntWritable(int value) {
+        set(value);
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+
+    /** Set the value of this VIntWritable. */
+    public void set(int value) {
+        this.value = value;
+    }
+
+    /** Return the value of this VIntWritable. */
+    public int get() {
+        return value;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        value = WritableUtils.readVInt(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, value);
+    }
+
+    /** Returns true iff <code>o</code> is a VIntWritable with the same value. */
+    public boolean equals(Object o) {
+        if (!(o instanceof VIntWritable))
+            return false;
+        VIntWritable other = (VIntWritable) o;
+        return this.value == other.value;
+    }
+
+    public int hashCode() {
+        return value;
+    }
+
+    /** Compares two VIntWritables. */
+    public int compareTo(Object o) {
+        int thisValue = this.value;
+        int thatValue = ((VIntWritable) o).value;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    public String toString() {
+        return Integer.toString(value);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VLongWritable.java
similarity index 69%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
rename to pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VLongWritable.java
index e12d930..57bd386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/io/VLongWritable.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.example.io;
+package edu.uci.ics.pregelix.example.lib.io;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -22,16 +22,26 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
  * A WritableComparable for longs in a variable-length format. Such values take
- * between one and five bytes. Smaller values take fewer bytes.
+ * between one and nine bytes. Smaller values take fewer bytes.
  * 
  * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
  */
 @SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable {
+public class VLongWritable implements WritableComparable, WritableSizable {
+    private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
+    private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
+    private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
+    private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
+    private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
+    private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
+    private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
+    private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
+
     private long value;
 
     public VLongWritable() {
@@ -41,6 +51,28 @@
         set(value);
     }
 
+    public int sizeInBytes() {
+        if (value >= 0 && value <= ONE_BYTE_MAX) {
+            return 1;
+        } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
+            return 2;
+        } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
+            return 3;
+        } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
+            return 4;
+        } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
+            return 5;
+        } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
+            return 6;
+        } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
+            return 7;
+        } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
+            return 8;
+        } else {
+            return 9;
+        }
+    }
+
     /** Set the value of this LongWritable. */
     public void set(long value) {
         this.value = value;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexAggregator.java
similarity index 98%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java
rename to pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexAggregator.java
index 1d3f15c..b21614c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexAggregator.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.example.utils;
+package edu.uci.ics.pregelix.example.lib.utils;
 
 import java.io.IOException;
 import java.util.Iterator;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexSorter.java
similarity index 98%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java
rename to pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexSorter.java
index 855d371..f17960f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/lib/utils/VertexSorter.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.example.utils;
+package edu.uci.ics.pregelix.example.lib.utils;
 
 import java.io.IOException;
 import java.util.Iterator;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
index 0a58c00..6c5eec7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -23,14 +23,13 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.io.Writable;
-
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * The adjacency list contains <src, list-of-neighbors>
  */
-public class AdjacencyListWritable implements Writable {
+public class AdjacencyListWritable implements WritableSizable {
 
     private VLongWritable sourceVertex = new VLongWritable();
     private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
@@ -96,4 +95,13 @@
         return destinationVertexes.contains(v);
     }
 
+    @Override
+    public int sizeInBytes() {
+        int size = 4; // the size of list bytes
+        for (VLongWritable dest : destinationVertexes) {
+            size += dest.sizeInBytes();
+        }
+        return size;
+    }
+
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
index c08bac0..046436f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
@@ -23,7 +23,7 @@
 
 import org.apache.hadoop.io.Writable;
 
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * The representation of cliques stored in a vertex.
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java
index fe01d2b..3036c36 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * The global aggregator aggregates the count of triangles
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
index 13c3bf5..9622fbd 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
 
 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
index 2107e02..34721a6 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
@@ -31,7 +31,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextMaximalCliqueInputFormat extends
         TextVertexInputFormat<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java
index c963b45..c94d1c6 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java
@@ -29,7 +29,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 public class TextTriangleCountingInputFormat extends
         TextVertexInputFormat<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java
index 06d119f..4e4b626 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java
@@ -17,7 +17,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * The global aggregator aggregates the count of triangles
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
index a8d85ab..d337d86 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
@@ -34,8 +34,8 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
-import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.lib.io.VLongWritable;
 
 /**
  * The triangle counting example -- counting the triangles in an undirected graph.
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 670620f..840c0d2 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -28,17 +28,20 @@
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
 import edu.uci.ics.pregelix.example.GraphMutationVertex.SimpleGraphMutationVertexOutputFormat;
+import edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex.SimpleMessageOverflowVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
 import edu.uci.ics.pregelix.example.ReachabilityVertex;
 import edu.uci.ics.pregelix.example.ReachabilityVertex.SimpleReachibilityVertexOutputFormat;
 import edu.uci.ics.pregelix.example.ShortestPathsVertex;
-import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
 import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
 import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator;
 import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex;
 import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex.MaximalCliqueVertexOutputFormat;
@@ -280,6 +283,47 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generateMessageOverflowFixedsizeJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(MessageOverflowFixedsizeVertex.SimpleMessageOverflowVertexOutputFormat.class);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJobLSM(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        job.setLSMStorage(true);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void genPageRank() throws IOException {
         generatePageRankJob("PageRank", outputBase + "PageRank.xml");
         generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
@@ -319,6 +363,12 @@
         generateGraphMutationJob("Graph Mutation", outputBase + "GraphMutation.xml");
     }
 
+    private static void genMessageOverflow() throws IOException {
+        generateMessageOverflowJob("Message Overflow", outputBase + "MessageOverflow.xml");
+        generateMessageOverflowJobLSM("Message Overflow LSM", outputBase + "MessageOverflowLSM.xml");
+        generateMessageOverflowFixedsizeJob("Message Overflow Fixedsize", outputBase + "MessageOverflowFixedsize.xml");
+    }
+
     public static void main(String[] args) throws IOException {
         genPageRank();
         genShortestPath();
@@ -327,5 +377,6 @@
         genTriangleCounting();
         genMaximalClique();
         genGraphMutation();
+        genMessageOverflow();
     }
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/VLongWritableTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/VLongWritableTest.java
new file mode 100644
index 0000000..d487dcd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/VLongWritableTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.MsgList;
+
+/**
+ * @author yingyib
+ */
+public class VLongWritableTest {
+
+    @Test
+    public void test() throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput dos = new DataOutputStream(bos);
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<VLongWritable> msgList = new MsgList<VLongWritable>();
+        int accumulatedSize = 4;
+        for (int i = 0; i < 1000000; i++) {
+            bos.reset();
+            VLongWritable value = new VLongWritable(Math.abs(rand.nextLong()));
+            value.write(dos);
+            if (value.sizeInBytes() < bos.size()) {
+                throw new Exception(value + " estimated size (" + value.sizeInBytes()
+                        + ") is smaller than the actual size" + bos.size());
+            }
+            msgList.add(value);
+            accumulatedSize += value.sizeInBytes();
+        }
+        bos.reset();
+        msgList.write(dos);
+        if (accumulatedSize < bos.size()) {
+            throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+                    + bos.size());
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index decbde8..9718a6b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index cca66bb..2e7aedc 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index d5ec8f1..8b7f0fb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Graph Mutation</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index b4c42e6..f82f931 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Maximal Clique</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 6cf075b..42cc010 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Maximal Clique 2</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index 49e2e6f..b9d3a68 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Maximal Clique 3</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
new file mode 100644
index 0000000..4c5cfc0
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>4096</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
new file mode 100644
index 0000000..10fab63
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow Fixedsize</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>4096</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
new file mode 100644
index 0000000..1998c0e
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>4096</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 65e0b30..a4808ff 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 9e1e0b0..8a9df8a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index c4366d7..35ddda8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index c05a4da..38dcf39 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index ac0d508..0c381d6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 225429a..f77e175 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index bd9da92..469bd2a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 9acd7bc..fe335aa 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>ShortestPaths</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 6c25575..ee41e0a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>ShortestPaths</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 4a40a6a..616cdb0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -37,7 +37,7 @@
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
 <property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
 <property><name>mapred.job.name</name><value>Triangle Counting</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.lib.data.VLongNormalizedKeyComputer</value></property>
 <property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
 <property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 77f28e4..3195619 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -14,22 +14,27 @@
  */
 package edu.uci.ics.pregelix.runtime.simpleagg;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
 
-public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateFunctionFactory[] aggFactories;
@@ -41,70 +46,16 @@
     @SuppressWarnings("unchecked")
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] groupFields, int[] partialgroupFields,
+            final IFrameWriter writer, final ByteBuffer outputFrame, final FrameTupleAppender appender)
+            throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
+        final ArrayTupleBuilder internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         return new IAggregatorDescriptor() {
-
             private FrameTupleReference ftr = new FrameTupleReference();
-
-            @Override
-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
-                IAggregateFunction[] agg = aggState.getRight();
-
-                // initialize aggregate functions
-                for (int i = 0; i < agg.length; i++) {
-                    aggOutput[i].reset();
-                    try {
-                        agg[i].init();
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
-
-            @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                IAggregateFunction[] agg = aggState.getRight();
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].step(ftr);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
-
-            @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
-                IAggregateFunction[] agg = aggState.getRight();
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].finish();
-                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
-                                aggOutput[i].getLength());
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
+            private int groupKeySize = 0;
+            private int metaSlotSize = 4;
 
             @Override
             public AggregateState createAggregateStates() {
@@ -113,7 +64,7 @@
                 for (int i = 0; i < agg.length; i++) {
                     aggOutput[i] = new ArrayBackedValueStorage();
                     try {
-                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
+                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i], writer);
                     } catch (Exception e) {
                         throw new IllegalStateException(e);
                     }
@@ -122,6 +73,38 @@
             }
 
             @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                setGroupKeySize(accessor, tIndex);
+                initAggregateFunctions(state);
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    throw new HyracksDataException(
+                            "Message combiner intermediate data size "
+                                    + stateSize
+                                    + " is larger than frame size! Check the size estimattion implementation in the message combiner.");
+                }
+                singleStep(accessor, tIndex, state);
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                int stateSize = estimateStep(accessor, tIndex, state);
+                if (stateSize > frameSize) {
+                    emitResultTuple(accessor, tIndex, state);
+                    initAggregateFunctions(state);
+                }
+                singleStep(accessor, tIndex, state);
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                fillResultTupleBuilder(tupleBuilder, state);
+            }
+
+            @Override
             public void reset() {
 
             }
@@ -137,6 +120,98 @@
 
             }
 
+            private void initAggregateFunctions(AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+
+                /**
+                 * initialize aggregate functions
+                 */
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i].reset();
+                    try {
+                        agg[i].init();
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private void singleStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            private int estimateStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                int size = metaSlotSize + groupKeySize;
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        size += agg[i].estimateStep(ftr) + metaSlotSize;
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                return size;
+            }
+
+            private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                internalTupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
+                }
+                fillResultTupleBuilder(internalTupleBuilder, state);
+                if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, writer);
+                    appender.reset(outputFrame, true);
+                    if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+                            internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                        throw new HyracksDataException("The output cannot be fit into a frame.");
+                    }
+                }
+            }
+
+            public void setGroupKeySize(IFrameTupleAccessor accessor, int tIndex) {
+                groupKeySize = 0;
+                for (int i = 0; i < groupFields.length; i++) {
+                    int fIndex = groupFields[i];
+                    int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+                    int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+                    groupKeySize += fLen + metaSlotSize;
+                }
+            }
+
+            private void fillResultTupleBuilder(ArrayTupleBuilder tupleBuilder, AggregateState state)
+                    throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
         };
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 8090dff..6821e31 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -54,10 +56,11 @@
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
 
-    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
-            boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
+            IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
+            throws HyracksDataException {
         this.conf = confFactory.createConfiguration(ctx);
-        this.output = output;
+        this.output = tmpOutput;
         this.isFinalStage = isFinalStage;
         this.partialAggAsInput = partialAggAsInput;
         msgList.setConf(this.conf);
@@ -75,6 +78,29 @@
 
     @Override
     public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        if (!partialAggAsInput) {
+            combiner.stepPartial(key, (WritableSizable) value);
+        } else {
+            combiner.stepFinal(key, value);
+        }
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinal();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
         FrameTupleReference ftr = (FrameTupleReference) tuple;
         IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
         ByteBuffer buffer = fta.getBuffer();
@@ -94,28 +120,13 @@
             }
             value.readFields(valueInput);
             if (!partialAggAsInput) {
-                combiner.stepPartial(key, value);
+                return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
             } else {
-                combiner.stepFinal(key, value);
+                return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-
-    }
-
-    @Override
-    public void finish() throws HyracksDataException {
-        try {
-            if (!isFinalStage) {
-                combinedResult = combiner.finishPartial();
-            } else {
-                combinedResult = combiner.finishFinal();
-            }
-            combinedResult.write(output);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
     }
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index 33dfa5d..54eccf5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
@@ -37,9 +38,9 @@
     }
 
     @Override
-    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
-            throws HyracksException {
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+            IFrameWriter writer) throws HyracksException {
         DataOutput output = provider.getDataOutput();
-        return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
+        return new AggregationFunction(ctx, confFactory, output, writer, isFinalStage, partialAggAsInput);
     }
 }