cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-api/pom.xml b/fullstack/pregelix/pregelix-api/pom.xml
index 66a0186..8212e1c 100644
--- a/fullstack/pregelix/pregelix-api/pom.xml
+++ b/fullstack/pregelix/pregelix-api/pom.xml
@@ -21,8 +21,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -41,6 +42,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -76,11 +78,11 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
+                        <groupId>edu.uci.ics.hyracks</groupId>
+                        <artifactId>hyracks-hdfs-core</artifactId>
+                        <version>0.2.3-SNAPSHOT</version>
+                        <type>jar</type>
+                        <scope>compile</scope>
+                </dependency>
 	</dependencies>
 </project>
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
index 4af35fe..e5f42fe 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -42,7 +42,7 @@
     private E edgeValue = null;
     /** Configuration - Used to instantiate classes */
     private Configuration conf = null;
-    /** Whether the edgeValue field is not null*/
+    /** Whether the edgeValue field is not null */
     private boolean hasEdgeValue = false;
 
     /**
@@ -115,8 +115,9 @@
         destVertexId.readFields(input);
         hasEdgeValue = input.readBoolean();
         if (hasEdgeValue) {
-            if (edgeValue == null)
+            if (edgeValue == null) {
                 edgeValue = (E) BspUtils.createEdgeValue(getConf());
+            }
             edgeValue.readFields(input);
         }
     }
@@ -128,8 +129,9 @@
         }
         destVertexId.write(output);
         output.writeBoolean(hasEdgeValue);
-        if (hasEdgeValue)
+        if (hasEdgeValue) {
             edgeValue.write(output);
+        }
     }
 
     @Override
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 734b1af..8d3d4c6 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -29,7 +29,7 @@
  */
 public class MsgList<M extends Writable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 100L;
+    private static final long serialVersionUID = 1L;
 
     /**
      * Default constructor.s
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 6856e9a..cd49184 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -49,384 +49,525 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-        implements Writable {
-    private static long superstep = 0;
-    /** Class-wide number of vertices */
-    private static long numVertices = -1;
-    /** Class-wide number of edges */
-    private static long numEdges = -1;
-    /** Vertex id */
-    private I vertexId = null;
-    /** Vertex value */
-    private V vertexValue = null;
-    /** Map of destination vertices and their edge values */
-    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
-    /** List of incoming messages from the previous superstep */
-    private final List<M> msgList = new ArrayList<M>();
-    /** map context */
-    private static Mapper.Context context = null;
-    /** a delegate for hyracks stuff */
-    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
-    /** this vertex is updated or not */
-    private boolean updated = false;
-    /** has outgoing messages */
-    private boolean hasMessage = false;
+		implements Writable {
+	private static long superstep = 0;
+	/** Class-wide number of vertices */
+	private static long numVertices = -1;
+	/** Class-wide number of edges */
+	private static long numEdges = -1;
+	/** Vertex id */
+	private I vertexId = null;
+	/** Vertex value */
+	private V vertexValue = null;
+	/** Map of destination vertices and their edge values */
+	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+	/** If true, do not do anymore computation on this vertex. */
+	boolean halt = false;
+	/** List of incoming messages from the previous superstep */
+	private final List<M> msgList = new ArrayList<M>();
+	/** map context */
+	private static TaskAttemptContext context = null;
+	/** a delegate for hyracks stuff */
+	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
+			this);
+	/** this vertex is updated or not */
+	private boolean updated = false;
+	/** has outgoing messages */
+	private boolean hasMessage = false;
+	/** created new vertex */
+	private boolean createdNewLiveVertex = false;
 
-    /**
-     * use object pool for re-using objects
-     */
-    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-    private List<M> msgPool = new ArrayList<M>();
-    private List<V> valuePool = new ArrayList<V>();
-    private int usedEdge = 0;
-    private int usedMessage = 0;
-    private int usedValue = 0;
+	/**
+	 * use object pool for re-using objects
+	 */
+	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+	private List<M> msgPool = new ArrayList<M>();
+	private List<V> valuePool = new ArrayList<V>();
+	private int usedEdge = 0;
+	private int usedMessage = 0;
+	private int usedValue = 0;
 
-    /**
-     * The key method that users need to implement
-     * 
-     * @param msgIterator
-     *            an iterator of incoming messages
-     */
-    public abstract void compute(Iterator<M> msgIterator);
+	/**
+	 * The key method that users need to implement
+	 * 
+	 * @param msgIterator
+	 *            an iterator of incoming messages
+	 */
+	public abstract void compute(Iterator<M> msgIterator);
 
-    /**
-     * Add an edge for the vertex.
-     * 
-     * @param targetVertexId
-     * @param edgeValue
-     * @return successful or not
-     */
-    public final boolean addEdge(I targetVertexId, E edgeValue) {
-        Edge<I, E> edge = this.allocateEdge();
-        edge.setDestVertexId(targetVertexId);
-        edge.setEdgeValue(edgeValue);
-        destEdgeList.add(edge);
-        return true;
-    }
+	/**
+	 * Add an edge for the vertex.
+	 * 
+	 * @param targetVertexId
+	 * @param edgeValue
+	 * @return successful or not
+	 */
+	public final boolean addEdge(I targetVertexId, E edgeValue) {
+		Edge<I, E> edge = this.allocateEdge();
+		edge.setDestVertexId(targetVertexId);
+		edge.setEdgeValue(edgeValue);
+		destEdgeList.add(edge);
+		updated = true;
+		return true;
+	}
 
-    /**
-     * Initialize a new vertex
-     * 
-     * @param vertexId
-     * @param vertexValue
-     * @param edges
-     * @param messages
-     */
-    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
-        if (vertexId != null) {
-            setVertexId(vertexId);
-        }
-        if (vertexValue != null) {
-            setVertexValue(vertexValue);
-        }
-        destEdgeList.clear();
-        if (edges != null && !edges.isEmpty()) {
-            for (Map.Entry<I, E> entry : edges.entrySet()) {
-                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
-            }
-        }
-        if (messages != null && !messages.isEmpty()) {
-            msgList.addAll(messages);
-        }
-    }
+	/**
+	 * Initialize a new vertex
+	 * 
+	 * @param vertexId
+	 * @param vertexValue
+	 * @param edges
+	 * @param messages
+	 */
+	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
+			List<M> messages) {
+		if (vertexId != null) {
+			setVertexId(vertexId);
+		}
+		if (vertexValue != null) {
+			setVertexValue(vertexValue);
+		}
+		destEdgeList.clear();
+		if (edges != null && !edges.isEmpty()) {
+			for (Map.Entry<I, E> entry : edges.entrySet()) {
+				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
+						.getValue()));
+			}
+		}
+		if (messages != null && !messages.isEmpty()) {
+			msgList.addAll(messages);
+		}
+	}
 
-    /**
-     * reset a vertex object: clear its internal states
-     */
-    public void reset() {
-        usedEdge = 0;
-        usedMessage = 0;
-        usedValue = 0;
-    }
+	/**
+	 * reset a vertex object: clear its internal states
+	 */
+	public void reset() {
+		usedEdge = 0;
+		usedMessage = 0;
+		usedValue = 0;
+		updated = false;
+	}
 
-    private Edge<I, E> allocateEdge() {
-        Edge<I, E> edge;
-        if (usedEdge < edgePool.size()) {
-            edge = edgePool.get(usedEdge);
-            usedEdge++;
-        } else {
-            edge = new Edge<I, E>();
-            edgePool.add(edge);
-            usedEdge++;
-        }
-        return edge;
-    }
+	/**
+	 * Set the vertex id
+	 * 
+	 * @param vertexId
+	 */
+	public final void setVertexId(I vertexId) {
+		this.vertexId = vertexId;
+		delegate.setVertexId(vertexId);
+	}
 
-    private M allocateMessage() {
-        M message;
-        if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
-            usedMessage++;
-        } else {
-            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
-            msgPool.add(message);
-            usedMessage++;
-        }
-        return message;
-    }
+	/**
+	 * Get the vertex id
+	 * 
+	 * @return vertex id
+	 */
+	public final I getVertexId() {
+		return vertexId;
+	}
 
-    private V allocateValue() {
-        V value;
-        if (usedValue < valuePool.size()) {
-            value = valuePool.get(usedEdge);
-            usedValue++;
-        } else {
-            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
-            valuePool.add(value);
-            usedValue++;
-        }
-        return value;
-    }
+	/**
+	 * Get the vertex value
+	 * 
+	 * @return the vertex value
+	 */
+	public final V getVertexValue() {
+		return vertexValue;
+	}
 
-    /**
-     * Set the vertex id
-     * 
-     * @param vertexId
-     */
-    public final void setVertexId(I vertexId) {
-        this.vertexId = vertexId;
-        delegate.setVertexId(vertexId);
-    }
+	/**
+	 * Set the vertex value
+	 * 
+	 * @param vertexValue
+	 */
+	public final void setVertexValue(V vertexValue) {
+		this.vertexValue = vertexValue;
+		this.updated = true;
+	}
 
-    /**
-     * Get the vertex id
-     * 
-     * @return vertex id
-     */
-    public final I getVertexId() {
-        return vertexId;
-    }
+	/***
+	 * Send a message to a specific vertex
+	 * 
+	 * @param id
+	 *            the receiver vertex id
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsg(I id, M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsg: Cannot send null message to " + id);
+		}
+		delegate.sendMsg(id, msg);
+		this.hasMessage = true;
+	}
 
-    /**
-     * Set the global superstep for all the vertices (internal use)
-     * 
-     * @param superstep
-     *            New superstep
-     */
-    public static void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
+	/**
+	 * Send a message to all direct outgoing neighbors
+	 * 
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsgToAllEdges(M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsgToAllEdges: Cannot send null message to all edges");
+		}
+		for (Edge<I, E> edge : destEdgeList) {
+			sendMsg(edge.getDestVertexId(), msg);
+		}
+	}
 
-    public static long getCurrentSuperstep() {
-        return superstep;
-    }
+	/**
+	 * Vote to halt. Once all vertex vote to halt and no more messages, a
+	 * Pregelix job will terminate.
+	 */
+	public final void voteToHalt() {
+		halt = true;
+		updated = true;
+	}
 
-    public final long getSuperstep() {
-        return superstep;
-    }
+	/**
+	 * @return the vertex is halted (true) or not (false)
+	 */
+	public final boolean isHalted() {
+		return halt;
+	}
 
-    public final V getVertexValue() {
-        return vertexValue;
-    }
+	@Override
+	final public void readFields(DataInput in) throws IOException {
+		reset();
+		if (vertexId == null)
+			vertexId = BspUtils.<I> createVertexIndex(getContext()
+					.getConfiguration());
+		vertexId.readFields(in);
+		delegate.setVertexId(vertexId);
+		boolean hasVertexValue = in.readBoolean();
 
-    public final void setVertexValue(V vertexValue) {
-        this.vertexValue = vertexValue;
-        this.updated = true;
-    }
+		if (hasVertexValue) {
+			vertexValue = allocateValue();
+			vertexValue.readFields(in);
+			delegate.setVertex(this);
+		}
+		destEdgeList.clear();
+		long edgeMapSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < edgeMapSize; ++i) {
+			Edge<I, E> edge = allocateEdge();
+			edge.setConf(getContext().getConfiguration());
+			edge.readFields(in);
+			addEdge(edge);
+		}
+		msgList.clear();
+		long msgListSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < msgListSize; ++i) {
+			M msg = allocateMessage();
+			msg.readFields(in);
+			msgList.add(msg);
+		}
+		halt = in.readBoolean();
+		updated = false;
+		hasMessage = false;
+		createdNewLiveVertex = false;
+	}
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     * 
-     * @param numVertices
-     *            Aggregate vertices in the last superstep
-     */
-    public static void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
+	@Override
+	public void write(DataOutput out) throws IOException {
+		vertexId.write(out);
+		out.writeBoolean(vertexValue != null);
+		if (vertexValue != null) {
+			vertexValue.write(out);
+		}
+		SerDeUtils.writeVLong(out, destEdgeList.size());
+		for (Edge<I, E> edge : destEdgeList) {
+			edge.write(out);
+		}
+		SerDeUtils.writeVLong(out, msgList.size());
+		for (M msg : msgList) {
+			msg.write(out);
+		}
+		out.writeBoolean(halt);
+	}
 
-    public final long getNumVertices() {
-        return numVertices;
-    }
+	/**
+	 * Get the list of incoming messages
+	 * 
+	 * @return the list of messages
+	 */
+	public List<M> getMsgList() {
+		return msgList;
+	}
 
-    /**
-     * Set the total number of edges from the last superstep.
-     * 
-     * @param numEdges
-     *            Aggregate edges in the last superstep
-     */
-    public static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
+	/**
+	 * Get outgoing edge list
+	 * 
+	 * @return a list of outgoing edges
+	 */
+	public List<Edge<I, E>> getEdges() {
+		return this.destEdgeList;
+	}
 
-    public final long getNumEdges() {
-        return numEdges;
-    }
+	@Override
+	@SuppressWarnings("unchecked")
+	public String toString() {
+		Collections.sort(destEdgeList);
+		StringBuffer edgeBuffer = new StringBuffer();
+		edgeBuffer.append("(");
+		for (Edge<I, E> edge : destEdgeList) {
+			edgeBuffer.append(edge.getDestVertexId()).append(",");
+		}
+		edgeBuffer.append(")");
+		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
+				+ ", edges=" + edgeBuffer + ")";
+	}
 
-    /***
-     * Send a message to a specific vertex
-     * 
-     * @param id
-     *            the receiver vertex id
-     * @param msg
-     *            the message
-     */
-    public final void sendMsg(I id, M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
-        }
-        delegate.sendMsg(id, msg);
-        this.hasMessage = true;
-    }
+	/**
+	 * Get the number of outgoing edges
+	 * 
+	 * @return the number of outging edges
+	 */
+	public int getNumOutEdges() {
+		return destEdgeList.size();
+	}
 
-    /**
-     * Send a message to all direct outgoing neighbors
-     * 
-     * @param msg
-     *            the message
-     */
-    public final void sendMsgToAllEdges(M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        for (Edge<I, E> edge : destEdgeList) {
-            sendMsg(edge.getDestVertexId(), msg);
-        }
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputWriters(List<IFrameWriter> writers) {
+		delegate.setOutputWriters(writers);
+	}
 
-    /**
-     * Vote to halt. Once all vertex vote to halt and no more messages, a
-     * Pregelix job will terminate.
-     */
-    public final void voteToHalt() {
-        halt = true;
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+		delegate.setOutputAppenders(appenders);
+	}
 
-    /**
-     * @return the vertex is halted (true) or not (false)
-     */
-    public final boolean isHalted() {
-        return halt;
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+		delegate.setOutputTupleBuilders(tbs);
+	}
 
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        reset();
-        if (vertexId == null)
-            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
-        vertexId.readFields(in);
-        delegate.setVertexId(vertexId);
-        boolean hasVertexValue = in.readBoolean();
-        if (hasVertexValue) {
-            vertexValue = allocateValue();
-            vertexValue.readFields(in);
-            delegate.setVertex(this);
-        }
-        destEdgeList.clear();
-        long edgeMapSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < edgeMapSize; ++i) {
-            Edge<I, E> edge = allocateEdge();
-            edge.setConf(getContext().getConfiguration());
-            edge.readFields(in);
-            addEdge(edge);
-        }
-        msgList.clear();
-        long msgListSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < msgListSize; ++i) {
-            M msg = allocateMessage();
-            msg.readFields(in);
-            msgList.add(msg);
-        }
-        halt = in.readBoolean();
-        updated = false;
-        hasMessage = false;
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void finishCompute() throws IOException {
+		delegate.finishCompute();
+	}
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        vertexId.write(out);
-        out.writeBoolean(vertexValue != null);
-        if (vertexValue != null) {
-            vertexValue.write(out);
-        }
-        SerDeUtils.writeVLong(out, destEdgeList.size());
-        for (Edge<I, E> edge : destEdgeList) {
-            edge.write(out);
-        }
-        SerDeUtils.writeVLong(out, msgList.size());
-        for (M msg : msgList) {
-            msg.write(out);
-        }
-        out.writeBoolean(halt);
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasUpdate() {
+		return this.updated;
+	}
 
-    private boolean addEdge(Edge<I, E> edge) {
-        edge.setConf(getContext().getConfiguration());
-        destEdgeList.add(edge);
-        return true;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasMessage() {
+		return this.hasMessage;
+	}
 
-    /**
-     * Get the list of incoming messages
-     * 
-     * @return the list of messages
-     */
-    public List<M> getMsgList() {
-        return msgList;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean createdNewLiveVertex() {
+		return this.createdNewLiveVertex;
+	}
 
-    /**
-     * Get outgoing edge list
-     * 
-     * @return a list of outgoing edges
-     */
-    public List<Edge<I, E>> getEdges() {
-        return this.destEdgeList;
-    }
+	/**
+	 * sort the edges
+	 */
+	@SuppressWarnings("unchecked")
+	public void sortEdges() {
+		updated = true;
+		Collections.sort(destEdgeList);
+	}
 
-    public final Mapper<?, ?, ?, ?>.Context getContext() {
-        return context;
-    }
+	/**
+	 * Allocate a new edge from the edge pool
+	 */
+	private Edge<I, E> allocateEdge() {
+		Edge<I, E> edge;
+		if (usedEdge < edgePool.size()) {
+			edge = edgePool.get(usedEdge);
+			usedEdge++;
+		} else {
+			edge = new Edge<I, E>();
+			edgePool.add(edge);
+			usedEdge++;
+		}
+		return edge;
+	}
 
-    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
-        Vertex.context = context;
-    }
+	/**
+	 * Allocate a new message from the message pool
+	 */
+	private M allocateMessage() {
+		M message;
+		if (usedMessage < msgPool.size()) {
+			message = msgPool.get(usedEdge);
+			usedMessage++;
+		} else {
+			message = BspUtils.<M> createMessageValue(getContext()
+					.getConfiguration());
+			msgPool.add(message);
+			usedMessage++;
+		}
+		return message;
+	}
 
-    @SuppressWarnings("unchecked")
-    public String toString() {
-        Collections.sort(destEdgeList);
-        StringBuffer edgeBuffer = new StringBuffer();
-        edgeBuffer.append("(");
-        for (Edge<I, E> edge : destEdgeList) {
-            edgeBuffer.append(edge.getDestVertexId()).append(",");
-        }
-        edgeBuffer.append(")");
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
-    }
+	/**
+	 * Set the global superstep for all the vertices (internal use)
+	 * 
+	 * @param superstep
+	 *            New superstep
+	 */
+	public static final void setSuperstep(long superstep) {
+		Vertex.superstep = superstep;
+	}
 
-    public void setOutputWriters(List<IFrameWriter> writers) {
-        delegate.setOutputWriters(writers);
-    }
+	/**
+	 * Add an outgoing edge into the vertex
+	 * 
+	 * @param edge
+	 *            the edge to be added
+	 * @return true if the edge list changed as a result of this call
+	 */
+	public boolean addEdge(Edge<I, E> edge) {
+		edge.setConf(getContext().getConfiguration());
+		updated = true;
+		return destEdgeList.add(edge);
+	}
 
-    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-        delegate.setOutputAppenders(appenders);
-    }
+	/**
+	 * remove an outgoing edge in the graph
+	 * 
+	 * @param edge
+	 *            the edge to be removed
+	 * @return true if the edge is in the edge list of the vertex
+	 */
+	public boolean removeEdge(Edge<I, E> edge) {
+		updated = true;
+		return destEdgeList.remove(edge);
+	}
 
-    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-        delegate.setOutputTupleBuilders(tbs);
-    }
+	/**
+	 * Add a new vertex into the graph
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 * @param vertex
+	 *            the vertex
+	 */
+	public final void addVertex(I vertexId, Vertex vertex) {
+		createdNewLiveVertex |= !vertex.isHalted();
+		delegate.addVertex(vertexId, vertex);
+	}
 
-    public void finishCompute() throws IOException {
-        delegate.finishCompute();
-    }
+	/**
+	 * Delete a vertex from id
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 */
+	public final void deleteVertex(I vertexId) {
+		delegate.deleteVertex(vertexId);
+	}
 
-    public boolean hasUpdate() {
-        return this.updated;
-    }
+	/**
+	 * Allocate a vertex value from the object pool
+	 * 
+	 * @return a vertex value instance
+	 */
+	private V allocateValue() {
+		V value;
+		if (usedValue < valuePool.size()) {
+			value = valuePool.get(usedValue);
+			usedValue++;
+		} else {
+			value = BspUtils.<V> createVertexValue(getContext()
+					.getConfiguration());
+			valuePool.add(value);
+			usedValue++;
+		}
+		return value;
+	}
 
-    public boolean hasMessage() {
-        return this.hasMessage;
-    }
+	/**
+	 * Get the current global superstep number
+	 * 
+	 * @return the current superstep number
+	 */
+	public static final long getSuperstep() {
+		return superstep;
+	}
 
-    public int getNumOutEdges() {
-        return destEdgeList.size();
-    }
+	/**
+	 * Set the total number of vertices from the last superstep.
+	 * 
+	 * @param numVertices
+	 *            Aggregate vertices in the last superstep
+	 */
+	public static final void setNumVertices(long numVertices) {
+		Vertex.numVertices = numVertices;
+	}
 
-    @SuppressWarnings("unchecked")
-    public void sortEdges() {
-        Collections.sort((List) destEdgeList);
-    }
+	/**
+	 * Get the number of vertexes in the graph
+	 * 
+	 * @return the number of vertexes in the graph
+	 */
+	public static final long getNumVertices() {
+		return numVertices;
+	}
+
+	/**
+	 * Set the total number of edges from the last superstep.
+	 * 
+	 * @param numEdges
+	 *            Aggregate edges in the last superstep
+	 */
+	public static void setNumEdges(long numEdges) {
+		Vertex.numEdges = numEdges;
+	}
+
+	/**
+	 * Get the number of edges from this graph
+	 * 
+	 * @return the number of edges in the graph
+	 */
+	public static final long getNumEdges() {
+		return numEdges;
+	}
+
+	/**
+	 * Pregelix internal use only
+	 */
+	public static final TaskAttemptContext getContext() {
+		return context;
+	}
+
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param context
+	 */
+	public static final void setContext(TaskAttemptContext context) {
+		Vertex.context = context;
+	}
 
 }
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 7267f30..4b153c1 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -44,6 +44,16 @@
     private IFrameWriter aliveWriter;
     private FrameTupleAppender appenderAlive;
 
+    /** the tuple for insert */
+    private ArrayTupleBuilder insertTb;
+    private IFrameWriter insertWriter;
+    private FrameTupleAppender appenderInsert;
+
+    /** the tuple for insert */
+    private ArrayTupleBuilder deleteTb;
+    private IFrameWriter deleteWriter;
+    private FrameTupleAppender appenderDelete;
+
     /** message list */
     private MsgList dummyMessageList = new MsgList();
     /** whether alive message should be pushed out */
@@ -95,25 +105,70 @@
         this.vertexId = vertexId;
     }
 
+    public final void addVertex(I vertexId, Vertex vertex) {
+        try {
+            insertTb.reset();
+            DataOutput outputInsert = insertTb.getDataOutput();
+            vertexId.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            vertex.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+
+            /**
+             * push alive when necessary
+             */
+            if (pushAlive && !vertex.isHalted()) {
+                alive.reset();
+                DataOutput outputAlive = alive.getDataOutput();
+                vertexId.write(outputAlive);
+                alive.addFieldEndOffset();
+                dummyMessageList.write(outputAlive);
+                alive.addFieldEndOffset();
+                FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public final void deleteVertex(I vertexId) {
+        try {
+            deleteTb.reset();
+            DataOutput outputDelete = deleteTb.getDataOutput();
+            vertexId.write(outputDelete);
+            deleteTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderDelete, deleteTb, deleteWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public final void setOutputWriters(List<IFrameWriter> outputs) {
         msgWriter = outputs.get(0);
-        if (outputs.size() > 1) {
-            aliveWriter = outputs.get(1);
+        insertWriter = outputs.get(1);
+        deleteWriter = outputs.get(2);
+        if (outputs.size() > 3) {
+            aliveWriter = outputs.get(outputs.size() - 1);
             pushAlive = true;
         }
     }
 
     public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
         appenderMsg = appenders.get(0);
-        if (appenders.size() > 1) {
-            appenderAlive = appenders.get(1);
+        appenderInsert = appenders.get(1);
+        appenderDelete = appenders.get(2);
+        if (appenders.size() > 3) {
+            appenderAlive = appenders.get(appenders.size() - 1);
         }
     }
 
     public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
         message = tbs.get(0);
-        if (tbs.size() > 1) {
-            alive = tbs.get(1);
+        insertTb = tbs.get(1);
+        deleteTb = tbs.get(2);
+        if (tbs.size() > 3) {
+            alive = tbs.get(tbs.size() - 1);
         }
     }
 }
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
index 7179737..ea33691 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -21,60 +21,64 @@
 import java.io.Serializable;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 /**
  * This InputSplit will not give any ordering or location data. It is used
  * internally by BspInputFormat (which determines how many tasks to run the
  * application on). Users should not use this directly.
  */
-public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
-    private static final long serialVersionUID = 1L;
-    /** Number of splits */
-    private int numSplits = -1;
-    /** Split index */
-    private int splitIndex = -1;
+public class BasicGenInputSplit extends FileSplit implements Writable,
+		Serializable {
+	private static final long serialVersionUID = 1L;
+	/** Number of splits */
+	private int numSplits = -1;
+	/** Split index */
+	private int splitIndex = -1;
 
-    public BasicGenInputSplit() {
-    }
+	public BasicGenInputSplit() {
+		super(null, 0, 0, null);
+	}
 
-    public BasicGenInputSplit(int splitIndex, int numSplits) {
-        this.splitIndex = splitIndex;
-        this.numSplits = numSplits;
-    }
+	public BasicGenInputSplit(int splitIndex, int numSplits) {
+		super(null, 0, 0, null);
+		this.splitIndex = splitIndex;
+		this.numSplits = numSplits;
+	}
 
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-        return 0;
-    }
+	@Override
+	public long getLength() {
+		return 0;
+	}
 
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[] {};
-    }
+	@Override
+	public String[] getLocations() throws IOException {
+		return new String[] {};
+	}
 
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        splitIndex = in.readInt();
-        numSplits = in.readInt();
-    }
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		splitIndex = in.readInt();
+		numSplits = in.readInt();
+	}
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(splitIndex);
-        out.writeInt(numSplits);
-    }
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(splitIndex);
+		out.writeInt(numSplits);
+	}
 
-    public int getSplitIndex() {
-        return splitIndex;
-    }
+	public int getSplitIndex() {
+		return splitIndex;
+	}
 
-    public int getNumSplits() {
-        return numSplits;
-    }
+	public int getNumSplits() {
+		return numSplits;
+	}
 
-    @Override
-    public String toString() {
-        return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
-    }
+	@Override
+	public String toString() {
+		return "'" + getClass().getCanonicalName() + ", index="
+				+ getSplitIndex() + ", num=" + getNumSplits();
+	}
 }
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..8b6d1b6 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,8 +60,12 @@
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
     public static final String NUM_EDGES = "pregelix.numEdges";
+    /** increase state length */
+    public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
     /** job id */
     public static final String JOB_ID = "pregelix.jobid";
+    /** frame size */
+    public static final String FRAME_SIZE = "pregelix.framesize";
 
     /**
      * Constructor that will instantiate the configuration
@@ -130,8 +134,8 @@
     /**
      * Set the global aggregator class (optional)
      * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
+     * @param globalAggregatorClass
+     *            Determines how messages are globally aggregated
      */
     final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
         getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +143,27 @@
 
     /**
      * Set the job Id
-     * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
      */
     final public void setJobId(String jobId) {
         getConfiguration().set(JOB_ID, jobId);
     }
+
+    /**
+     * Set whether the vertex state length can be dynamically increased
+     * 
+     * @param jobId
+     */
+    final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
+        getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+    }
+
+    /**
+     * Set the frame size for a job
+     * 
+     * @param frameSize
+     *            the desired frame size
+     */
+    final public void setFrameSize(int frameSize) {
+        getConfiguration().setInt(FRAME_SIZE, frameSize);
+    }
 }
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..ff9724d 100644
--- a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,26 @@
             throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
         }
     }
+
+    /**
+     * Get the job configuration parameter whether the vertex states will increase dynamically
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the boolean setting of the parameter, by default it is false
+     */
+    public static boolean getDynamicVertexValueSize(Configuration conf) {
+        return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+    }
+
+    /**
+     * Get the specified frame size
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the specified frame size; -1 if it is not set by users
+     */
+    public static int getFrameSize(Configuration conf) {
+        return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+    }
 }