merge from zheilbron/hyracks_msr
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 305b50c..0152a15 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.7-SNAPSHOT</version>
+		<version>0.2.10-SNAPSHOT</version>
 	</parent>
 
 	<properties>
@@ -82,7 +82,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.7-SNAPSHOT</version>
+			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -95,7 +95,7 @@
 		<dependency>
                         <groupId>edu.uci.ics.hyracks</groupId>
                         <artifactId>hyracks-hdfs-core</artifactId>
-                        <version>0.2.7-SNAPSHOT</version>
+                        <version>0.2.10-SNAPSHOT</version>
                         <type>jar</type>
                         <scope>compile</scope>
                 </dependency>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 08c7151..5ea6413 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
@@ -39,7 +40,7 @@
  */
 
 @SuppressWarnings("rawtypes")
-public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> {
     /**
      * initialize aggregator
      */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index f5daf99..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -19,6 +19,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This is the abstract class to implement for combining of messages that are sent to the same vertex.
@@ -36,7 +37,7 @@
  *        the type of the partially combined messages
  */
 @SuppressWarnings("rawtypes")
-public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends WritableSizable, P extends Writable> {
 
     /**
      * initialize combiner
@@ -82,4 +83,36 @@
      * @return the final message List
      */
     public abstract MsgList<M> finishFinal();
+
+    /**
+     * init the combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public void initAll(MsgList providedMsgList) {
+        init(providedMsgList);
+    }
+
+    /**
+     * finish final combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public MsgList<M> finishFinalAll() {
+        return finishFinal();
+    }
+
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return 0;
+    }
+
+    /**
+     * @return the accumulated byte size
+     */
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, P partialAggregate) throws HyracksDataException {
+        return 0;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 104f396..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,8 +15,11 @@
 
 package edu.uci.ics.pregelix.api.graph;
 
-import org.apache.hadoop.io.Writable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 
@@ -27,9 +30,11 @@
  * @param <M>
  *            message type
  */
-public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
     private static final long serialVersionUID = 1L;
+    private byte start = 1;
+    private byte end = 2;
 
     /**
      * Default constructor.s
@@ -43,4 +48,34 @@
     public void setClass() {
         setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
     }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeByte(start | end);
+        super.write(output);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        byte startEnd = input.readByte();
+        this.start = (byte) (startEnd & 1);
+        this.end = (byte) (startEnd & 2);
+        super.readFields(input);
+    }
+
+    public final void setSegmentStart(boolean segStart) {
+        this.start = (byte) (segStart ? 1 : 0);
+    }
+
+    public final void setSegmentEnd(boolean segEnd) {
+        this.end = (byte) (segEnd ? 2 : 0);
+    }
+
+    public boolean segmentStart() {
+        return start == 1 ? true : false;
+    }
+
+    public boolean segmentEnd() {
+        return end == 2 ? true : false;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 4175078..c52130d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,7 +32,9 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
@@ -48,7 +51,7 @@
  *            Message value type
  */
 @SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements Writable {
     private static long superstep = 0;
     /** Class-wide number of vertices */
@@ -75,6 +78,8 @@
     private boolean hasMessage = false;
     /** created new vertex */
     private boolean createdNewLiveVertex = false;
+    /** terminate the partition */
+    private boolean terminatePartition = false;
 
     /**
      * use object pool for re-using objects
@@ -87,12 +92,23 @@
     private int usedValue = 0;
 
     /**
-     * The key method that users need to implement
+     * The key method that users need to implement to process
+     * incoming messages in each superstep.
+     * 1. In a superstep, this method can be called multiple times in a continuous manner for a single
+     * vertex, each of which is to process a batch of messages. (Note that
+     * this only happens for the case when the mssages for a single vertex
+     * exceed one frame.)
+     * 2. In each superstep, before any invocation of this method for a vertex,
+     * open() is called; after all the invocations of this method for the vertex,
+     * close is called.
+     * 3. In each partition, the vertex Java object is reused
+     * for all the vertice to be processed in the same partition. (The model
+     * is the same as the key-value objects in hadoop map tasks.)
      * 
      * @param msgIterator
      *            an iterator of incoming messages
      */
-    public abstract void compute(Iterator<M> msgIterator);
+    public abstract void compute(Iterator<M> msgIterator) throws Exception;
 
     /**
      * Add an edge for the vertex.
@@ -569,4 +585,62 @@
         Vertex.context = context;
     }
 
+    @Override
+    public int hashCode() {
+        return vertexId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        Vertex vertex = (Vertex) object;
+        return vertexId.equals(vertex.getVertexId());
+    }
+
+    /**
+     * called immediately before invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
+     */
+    public void open() {
+
+    }
+
+    /**
+     * called immediately after all the invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
+     */
+    public void close() {
+
+    }
+
+    /**
+     * Terminate the current partition where the current vertex stays in.
+     * This will immediately take effect and the upcoming vertice in the
+     * same partition cannot be processed.
+     * 
+     */
+    protected final void terminatePartition() {
+        voteToHalt();
+        terminatePartition = true;
+    }
+
+    /**
+     * Terminate the Pregelix job.
+     * This will take effect only when the current iteration completed.
+     * 
+     * @throws Exception
+     */
+    protected void terminateJob() throws Exception {
+        Configuration conf = getContext().getConfiguration();
+        JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+    }
+
+    /***
+     * @return true if the partition is terminated; false otherwise
+     */
+    public boolean isPartitionTerminated() {
+        return terminatePartition;
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
new file mode 100644
index 0000000..568500b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+/**
+ * @author yingyib
+ */
+public interface Sizable {
+
+    public int sizeInBytes();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
index c841b1a..73af190 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -40,7 +40,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Logically split the vertices for a graph processing application.
      * Each {@link InputSplit} is then assigned to a worker for processing.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
index e6c62ba..ba8b561 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -39,7 +39,7 @@
  *            Message data
  */
 @SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
     /**
      * Use the input split and context t o setup reading the vertices.
      * Guaranteed to be called prior to any other function.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
new file mode 100644
index 0000000..ee13f76
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * @author yingyib
+ */
+public interface WritableSizable extends Writable, Sizable {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
index 985bcff..1d3c427 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -26,13 +26,14 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * This VertexInputFormat is meant for testing/debugging. It simply generates
  * some vertex data that can be consumed by test applications.
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
 
     @Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
index 92c8728..376d45d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -25,6 +25,7 @@
 
 import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Used by GeneratedVertexInputFormat to read some generated data
@@ -37,7 +38,7 @@
  *            Edge value
  */
 @SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         implements VertexReader<I, V, E, M> {
     /** Records read so far */
     protected long recordsRead = 0;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
index 2254ae4..0faf516 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -30,6 +30,7 @@
 
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 /**
  * Abstract class that users should subclass to use their own text based vertex
@@ -45,7 +46,7 @@
  *            Message value
  */
 @SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
     /** Uses the TextInputFormat to do everything */
     protected TextInputFormat textInputFormat = new TextInputFormat();
@@ -62,7 +63,7 @@
      * @param <E>
      *            Edge value
      */
-    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
             implements VertexReader<I, V, E, M> {
         /** Internal line record reader */
         private final RecordReader<LongWritable, Text> lineRecordReader;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 4cddaf0..dae7818 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -198,7 +198,7 @@
      * 
      * @param updateHeavyFlag
      */
-    final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+    final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
         getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
     }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 7a9e5d5..1683541 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,9 +141,6 @@
         used = 0;
         this.clear();
         int numValues = in.readInt(); // read number of values
-        if (numValues > 100) {
-            System.out.println("num values: " + numValues);
-        }
         for (int i = 0; i < numValues; i++) {
             M value = allocateValue();
             value.readFields(in); // read a value
@@ -153,9 +150,6 @@
 
     public void write(DataOutput out) throws IOException {
         int numValues = size();
-        if (numValues > 100) {
-            System.out.println("write num values: " + numValues);
-        }
         out.writeInt(numValues); // write number of values
         for (int i = 0; i < numValues; i++) {
             get(i).write(out);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 03c37dc..ff4ee91 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -49,7 +50,7 @@
      * @return User's vertex input format class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
             Configuration conf) {
         return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
                 null, VertexInputFormat.class);
@@ -63,7 +64,7 @@
      * @return Instantiated user vertex input format class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
             Configuration conf) {
         Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
         VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
@@ -106,7 +107,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
             Configuration conf) {
         return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
                 DefaultMessageCombiner.class, MessageCombiner.class);
@@ -120,7 +121,7 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
             Configuration conf) {
         return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
                 GlobalCountAggregator.class, GlobalAggregator.class);
@@ -138,7 +139,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+    public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
             Configuration conf) {
         Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
         return ReflectionUtils.newInstance(vertexCombinerClass, conf);
@@ -164,7 +165,7 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
             Configuration conf) {
         Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
         return ReflectionUtils.newInstance(globalAggregatorClass, conf);
@@ -178,7 +179,7 @@
      * @return User's vertex class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
             Configuration conf) {
         return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
     }
@@ -191,7 +192,7 @@
      * @return Instantiated user vertex
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Vertex<I, V, E, M> createVertex(
             Configuration conf) {
         Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
         Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
@@ -299,7 +300,7 @@
      * @return User's vertex message value class
      */
     @SuppressWarnings("unchecked")
-    public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+    public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
         if (conf == null)
             conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
@@ -369,7 +370,7 @@
      *            Configuration to check
      * @return Instantiated user vertex message value
      */
-    public static <M extends Writable> M createMessageValue(Configuration conf) {
+    public static <M extends WritableSizable> M createMessageValue(Configuration conf) {
         Class<M> messageValueClass = getMessageValueClass(conf);
         try {
             return messageValueClass.newInstance();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index d2d90a2..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -14,42 +14,82 @@
  */
 package edu.uci.ics.pregelix.api.util;
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+public class DefaultMessageCombiner<I extends WritableComparable, M extends WritableSizable> extends
         MessageCombiner<I, M, MsgList> {
     private MsgList<M> msgList;
+    private int metaSlot = 8;
+    private int accumulatedSize = metaSlot;
 
     @Override
     public void init(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        this.msgList.setSegmentStart(false);
+    }
+
+    private void realInit(MsgList providedMsgList) {
         this.msgList = providedMsgList;
         this.msgList.clearElements();
+        this.accumulatedSize = metaSlot;
     }
 
     @Override
     public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
         msgList.addElement(msg);
+        accumulatedSize += msg.sizeInBytes();
     }
 
     @Override
     public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
         msgList.addAllElements(partialAggregate);
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            accumulatedSize += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
     }
 
     @Override
     public MsgList finishPartial() {
+        msgList.setSegmentEnd(false);
         return msgList;
     }
 
     @Override
     public MsgList<M> finishFinal() {
+        msgList.setSegmentEnd(false);
         return msgList;
     }
 
+    @Override
+    public void initAll(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        msgList.setSegmentStart(true);
+    }
+
+    @Override
+    public MsgList<M> finishFinalAll() {
+        msgList.setSegmentEnd(true);
+        return msgList;
+    }
+
+    @Override
+    public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+        return accumulatedSize + msg.sizeInBytes();
+    }
+
+    @Override
+    public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, MsgList partialAggregate)
+            throws HyracksDataException {
+        int size = accumulatedSize;
+        for (int i = 0; i < partialAggregate.size(); i++) {
+            size += ((M) partialAggregate.get(i)).sizeInBytes();
+        }
+        return size;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
index ffc6526..9a95f09 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -21,9 +21,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 @SuppressWarnings("rawtypes")
-public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
 
     private LongWritable state = new LongWritable(0);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
new file mode 100644
index 0000000..4a98167
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * @author yingyib
+ */
+public class JobStateUtils {
+
+    public static final String TMP_DIR = "/tmp/";
+
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(true);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (dfs.exists(path)) {
+                return true;
+            } else {
+                return false;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}