merge from zheilbron/hyracks_msr
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
new file mode 100644
index 0000000..e369d29
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class EarlyTerminationVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+    private VLongWritable tempValue = new VLongWritable();
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            if (getVertexId().get() % 4 == 2) {
+                terminatePartition();
+            } else {
+                tempValue.set(1);
+                setVertexValue(tempValue);
+            }
+        }
+        if (getSuperstep() == 2) {
+            if (getVertexId().get() % 4 == 3) {
+                terminatePartition();
+            } else {
+                tempValue.set(2);
+                setVertexValue(tempValue);
+                voteToHalt();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleEarlyTerminattionVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleEarlyTerminattionVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleEarlyTerminattionVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleEarlyTerminattionVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(EarlyTerminationVertex.class.getSimpleName());
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 7cf8408..7fae776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -18,7 +18,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -32,6 +31,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
new file mode 100644
index 0000000..6c3c752
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowFixedsizeVertex extends Vertex<VLongWritable, LongWritable, VLongWritable, LongWritable> {
+
+    private LongWritable outputMsg = new LongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private LongWritable tmpVertexValue = new LongWritable(0);
+    private int numOfMsgClones = 10000;
+
+    @Override
+    public void compute(Iterator<LongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            long numOfMsg = getVertexValue().get();
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numOfMsg++;
+            }
+            tmpVertexValue.set(numOfMsg);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, LongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, LongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, LongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, LongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(MessageOverflowFixedsizeVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
new file mode 100644
index 0000000..d0221bf
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+    private VLongWritable outputMsg = new VLongWritable(1);
+    private Random rand = new Random(System.currentTimeMillis());
+    private VLongWritable tmpVertexValue = new VLongWritable(0);
+    private int numOfMsgClones = 10000;
+    private int numIncomingMsgs = 0;
+
+    @Override
+    public void open() {
+        if (getSuperstep() == 2) {
+            numIncomingMsgs = 0;
+        }
+    }
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            for (int i = 0; i < numOfMsgClones; i++) {
+                outputMsg.set(Math.abs(rand.nextLong()));
+                sendMsgToAllEdges(outputMsg);
+            }
+            tmpVertexValue.set(0);
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() == 2) {
+            while (msgIterator.hasNext()) {
+                msgIterator.next();
+                numIncomingMsgs++;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (getSuperstep() == 2) {
+            tmpVertexValue.set(numIncomingMsgs);
+            setVertexValue(tmpVertexValue);
+            voteToHalt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleMessageOverflowVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleMessageOverflowVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleMessageOverflowVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(MessageOverflowVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 8664667..a866c1c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -21,7 +21,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,6 +44,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 6a42636..1bb33b8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -42,6 +41,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
@@ -116,7 +116,7 @@
     }
 
     @Override
-    public void compute(Iterator<ByteWritable> msgIterator) {
+    public void compute(Iterator<ByteWritable> msgIterator) throws Exception {
         if (sourceId < 0) {
             sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
         }
@@ -171,13 +171,20 @@
         return getVertexId() + " " + getVertexValue();
     }
 
-    private void signalTerminate() {
-        Configuration conf = getContext().getConfiguration();
-        try {
-            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
-            writeReachibilityResult(conf, true);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+    private void signalTerminate() throws Exception {
+        writeReachibilityResult(getContext().getConfiguration(), true);
+        terminateJob();
+    }
+
+    private void writeReachibilityResult(Configuration conf, boolean terminate) throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+        Path path = new Path(pathStr);
+        if (!dfs.exists(path)) {
+            FSDataOutputStream output = dfs.create(path, true);
+            output.writeBoolean(terminate);
+            output.flush();
+            output.close();
         }
     }
 
@@ -187,22 +194,6 @@
         }
     }
 
-    private void writeReachibilityResult(Configuration conf, boolean terminate) {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
-            Path path = new Path(pathStr);
-            if (!dfs.exists(path)) {
-                FSDataOutputStream output = dfs.create(path, true);
-                output.writeBoolean(terminate);
-                output.flush();
-                output.close();
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
     private static boolean readReachibilityResult(Configuration conf) {
         try {
             FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 41c26b1..648f168 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -19,7 +19,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -32,6 +31,7 @@
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
@@ -127,7 +127,7 @@
         }
         voteToHalt();
     }
-    
+
     @Override
     public String toString() {
         return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 6cb1f4a..f60387a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -82,7 +82,7 @@
         String[] inputs = options.inputPaths.split(";");
         FileInputFormat.setInputPaths(job, inputs[0]);
         for (int i = 1; i < inputs.length; i++)
-            FileInputFormat.addInputPaths(job, inputs[0]);
+            FileInputFormat.addInputPaths(job, inputs[i]);
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
         job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index f46d9c3..67681d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 public class TextPageRankInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
index 013a063..3ea4a9f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 public class TextShortestPathsInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
new file mode 100644
index 0000000..c943288
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Boolean values.
+ */
+public class BooleanWritable extends org.apache.hadoop.io.BooleanWritable implements WritableSizable {
+
+    public BooleanWritable(boolean value) {
+        super(value);
+    }
+
+    public BooleanWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
new file mode 100644
index 0000000..2a1fd22
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Byte values.
+ */
+public class ByteWritable extends org.apache.hadoop.io.ByteWritable implements WritableSizable {
+
+    public ByteWritable(byte value) {
+        super(value);
+    }
+
+    public ByteWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 1;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
new file mode 100644
index 0000000..04a5549
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Bytes values.
+ */
+public class BytesWritable extends org.apache.hadoop.io.BytesWritable implements WritableSizable {
+
+    public BytesWritable(byte[] value) {
+        super(value);
+    }
+
+    public BytesWritable() {
+        super();
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return getLength() + 4; // add the integer size slot
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
new file mode 100644
index 0000000..ebc7fe4
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Double values.
+ */
+public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable {
+
+    public DoubleWritable(double value) {
+        super(value);
+    }
+
+    public DoubleWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
new file mode 100644
index 0000000..6772b0a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for floats. */
+public class FloatWritable extends org.apache.hadoop.io.FloatWritable implements WritableSizable {
+
+    public FloatWritable(float value) {
+        super(value);
+    }
+
+    public FloatWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
new file mode 100644
index 0000000..4944232
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for ints. */
+public class IntWritable extends org.apache.hadoop.io.IntWritable implements WritableSizable {
+
+    public IntWritable(int value) {
+        super(value);
+    }
+
+    public IntWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 4;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
new file mode 100644
index 0000000..3ecab79
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for longs. */
+public class LongWritable extends org.apache.hadoop.io.LongWritable implements WritableSizable {
+
+    public LongWritable(long value) {
+        super(value);
+    }
+
+    public LongWritable() {
+        super();
+    }
+
+    public int sizeInBytes() {
+        return 8;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
new file mode 100644
index 0000000..a2f184a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** Singleton Writable with no data. */
+@SuppressWarnings("rawtypes")
+public class NullWritable implements WritableComparable, WritableSizable {
+
+    private static final NullWritable THIS = new NullWritable();
+
+    private NullWritable() {
+    } // no public ctor
+
+    /** Returns the single instance of this class. */
+    public static NullWritable get() {
+        return THIS;
+    }
+
+    public String toString() {
+        return "(null)";
+    }
+
+    public int sizeInBytes() {
+        return 0;
+    }
+
+    public int hashCode() {
+        return 0;
+    }
+
+    public int compareTo(Object other) {
+        if (!(other instanceof NullWritable)) {
+            throw new ClassCastException("can't compare " + other.getClass().getName() + " to NullWritable");
+        }
+        return 0;
+    }
+
+    public boolean equals(Object other) {
+        return other instanceof NullWritable;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    public void write(DataOutput out) throws IOException {
+    }
+
+    /** A Comparator &quot;optimized&quot; for NullWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(NullWritable.class);
+        }
+
+        /**
+         * Compare the buffers in serialized form.
+         */
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            assert 0 == l1;
+            assert 0 == l2;
+            return 0;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(NullWritable.class, new Comparator());
+    }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
new file mode 100644
index 0000000..94df74f
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A WritableComparable for integer values stored in variable-length format.
+ * Such values take between one and five bytes. Smaller values take fewer bytes.
+ * 
+ * @see org.apache.hadoop.io.WritableUtils#readVInt(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VIntWritable implements WritableComparable, WritableSizable {
+    private int value;
+
+    public VIntWritable() {
+    }
+
+    public VIntWritable(int value) {
+        set(value);
+    }
+
+    public int sizeInBytes() {
+        return 5;
+    }
+
+    /** Set the value of this VIntWritable. */
+    public void set(int value) {
+        this.value = value;
+    }
+
+    /** Return the value of this VIntWritable. */
+    public int get() {
+        return value;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        value = WritableUtils.readVInt(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, value);
+    }
+
+    /** Returns true iff <code>o</code> is a VIntWritable with the same value. */
+    public boolean equals(Object o) {
+        if (!(o instanceof VIntWritable))
+            return false;
+        VIntWritable other = (VIntWritable) o;
+        return this.value == other.value;
+    }
+
+    public int hashCode() {
+        return value;
+    }
+
+    /** Compares two VIntWritables. */
+    public int compareTo(Object o) {
+        int thisValue = this.value;
+        int thatValue = ((VIntWritable) o).value;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    public String toString() {
+        return Integer.toString(value);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index e12d930..ec1109f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -22,16 +22,26 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
  * A WritableComparable for longs in a variable-length format. Such values take
- * between one and five bytes. Smaller values take fewer bytes.
+ * between one and nine bytes. Smaller values take fewer bytes.
  * 
  * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
  */
 @SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable {
+public class VLongWritable implements WritableComparable, WritableSizable {
+    private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
+    private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
+    private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
+    private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
+    private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
+    private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
+    private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
+    private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
+
     private long value;
 
     public VLongWritable() {
@@ -41,6 +51,28 @@
         set(value);
     }
 
+    public int sizeInBytes() {
+        if (value >= 0 && value <= ONE_BYTE_MAX) {
+            return 1;
+        } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
+            return 2;
+        } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
+            return 3;
+        } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
+            return 4;
+        } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
+            return 5;
+        } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
+            return 6;
+        } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
+            return 7;
+        } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
+            return 8;
+        } else {
+            return 9;
+        }
+    }
+
     /** Set the value of this LongWritable. */
     public void set(long value) {
         this.value = value;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
index 0a58c00..dd86a45 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -23,14 +23,13 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.io.Writable;
-
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
  * The adjacency list contains <src, list-of-neighbors>
  */
-public class AdjacencyListWritable implements Writable {
+public class AdjacencyListWritable implements WritableSizable {
 
     private VLongWritable sourceVertex = new VLongWritable();
     private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
@@ -96,4 +95,13 @@
         return destinationVertexes.contains(v);
     }
 
+    @Override
+    public int sizeInBytes() {
+        int size = 4; // the size of list bytes
+        for (VLongWritable dest : destinationVertexes) {
+            size += dest.sizeInBytes();
+        }
+        return size;
+    }
+
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 15117a1..13cec61 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -26,8 +26,13 @@
 import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex.SimpleEarlyTerminattionVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
 import edu.uci.ics.pregelix.example.GraphMutationVertex.SimpleGraphMutationVertexOutputFormat;
+import edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex.SimpleMessageOverflowVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -233,7 +238,7 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -247,7 +252,7 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -262,7 +267,7 @@
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
         job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
-        job.setMutationOrVariableSizedUpdateHeavy(true);
+        job.setLSMStorage(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -280,6 +285,59 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generateMessageOverflowFixedsizeJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(MessageOverflowFixedsizeVertex.SimpleMessageOverflowVertexOutputFormat.class);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateMessageOverflowJobLSM(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(MessageOverflowVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        job.setFrameSize(2048);
+        job.setLSMStorage(true);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void generateEarlyTerminationJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void genPageRank() throws IOException {
         generatePageRankJob("PageRank", outputBase + "PageRank.xml");
         generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
@@ -319,6 +377,16 @@
         generateGraphMutationJob("Graph Mutation", outputBase + "GraphMutation.xml");
     }
 
+    private static void genMessageOverflow() throws IOException {
+        generateMessageOverflowJob("Message Overflow", outputBase + "MessageOverflow.xml");
+        generateMessageOverflowJobLSM("Message Overflow LSM", outputBase + "MessageOverflowLSM.xml");
+        generateMessageOverflowFixedsizeJob("Message Overflow Fixedsize", outputBase + "MessageOverflowFixedsize.xml");
+    }
+
+    private static void genEarlyTermination() throws IOException {
+        generateEarlyTerminationJob("Early Termination", outputBase + "EarlyTermination.xml");
+    }
+
     public static void main(String[] args) throws IOException {
         genPageRank();
         genShortestPath();
@@ -327,5 +395,7 @@
         genTriangleCounting();
         genMaximalClique();
         genGraphMutation();
+        genMessageOverflow();
+        genEarlyTermination();
     }
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
new file mode 100644
index 0000000..196b114
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.IntWritable;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VIntWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * @author yingyib
+ */
+public class SizeEstimationTest {
+
+    @Test
+    public void testVLong() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testLong() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new LongWritable(rand.nextLong()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testBoolean() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new BooleanWritable(rand.nextBoolean()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testByte() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new ByteWritable((byte) rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testDouble() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new DoubleWritable(rand.nextDouble()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    @Test
+    public void testFloat() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new DoubleWritable(rand.nextFloat()));
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testNull() throws Exception {
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(NullWritable.get());
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testVInt() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new VIntWritable(rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+    
+    @Test
+    public void testInt() throws Exception {
+        Random rand = new Random(System.currentTimeMillis());
+        MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        for (int i = 0; i < 1000000; i++) {
+            msgList.add(new IntWritable(rand.nextInt()));
+        }
+        verifySizeEstimation(msgList);
+    }
+
+    private void verifySizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput dos = new DataOutputStream(bos);
+        int accumulatedSize = 5;
+        for (int i = 0; i < msgList.size(); i++) {
+            bos.reset();
+            WritableSizable value = msgList.get(i);
+            value.write(dos);
+            if (value.sizeInBytes() < bos.size()) {
+                throw new Exception(value + " estimated size (" + value.sizeInBytes()
+                        + ") is smaller than the actual size" + bos.size());
+            }
+            accumulatedSize += value.sizeInBytes();
+        }
+        bos.reset();
+        msgList.write(dos);
+        if (accumulatedSize < bos.size()) {
+            throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+                    + bos.size());
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
new file mode 100644
index 0000000..60a55af
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
@@ -0,0 +1,5 @@
+0	2
+4	2
+8	2
+12	2
+16	2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
new file mode 100644
index 0000000..32ee93f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
@@ -0,0 +1,5 @@
+1	2
+5	2
+9	2
+13	2
+17	2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
new file mode 100644
index 0000000..542ccae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
@@ -0,0 +1,5 @@
+2	0
+6	0
+10	0
+14	0
+18	0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
new file mode 100644
index 0000000..ff0e5b8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
@@ -0,0 +1,5 @@
+3	1
+7	1
+11	1
+15	1
+19	1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
@@ -0,0 +1,5 @@
+0	10000
+4	70000
+8	30000
+12	90000
+16	50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
@@ -0,0 +1,5 @@
+1	100000
+5	60000
+9	20000
+13	80000
+17	40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
@@ -0,0 +1,5 @@
+2	90000
+6	50000
+10	10000
+14	70000
+18	30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
@@ -0,0 +1,5 @@
+3	80000
+7	40000
+11	100000
+15	60000
+19	20000
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
new file mode 100644
index 0000000..d908da8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Early Termination</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex$SimpleEarlyTerminattionVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
new file mode 100644
index 0000000..8316c64
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
new file mode 100644
index 0000000..a894ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow Fixedsize</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
new file mode 100644
index 0000000..a9f8925
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file