fix the halt-activate issue reported by Anbang

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3269 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index cd49184..e51d4bc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,525 +49,524 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-		implements Writable {
-	private static long superstep = 0;
-	/** Class-wide number of vertices */
-	private static long numVertices = -1;
-	/** Class-wide number of edges */
-	private static long numEdges = -1;
-	/** Vertex id */
-	private I vertexId = null;
-	/** Vertex value */
-	private V vertexValue = null;
-	/** Map of destination vertices and their edge values */
-	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-	/** If true, do not do anymore computation on this vertex. */
-	boolean halt = false;
-	/** List of incoming messages from the previous superstep */
-	private final List<M> msgList = new ArrayList<M>();
-	/** map context */
-	private static TaskAttemptContext context = null;
-	/** a delegate for hyracks stuff */
-	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
-			this);
-	/** this vertex is updated or not */
-	private boolean updated = false;
-	/** has outgoing messages */
-	private boolean hasMessage = false;
-	/** created new vertex */
-	private boolean createdNewLiveVertex = false;
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static TaskAttemptContext context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
+    /** created new vertex */
+    private boolean createdNewLiveVertex = false;
 
-	/**
-	 * use object pool for re-using objects
-	 */
-	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-	private List<M> msgPool = new ArrayList<M>();
-	private List<V> valuePool = new ArrayList<V>();
-	private int usedEdge = 0;
-	private int usedMessage = 0;
-	private int usedValue = 0;
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
 
-	/**
-	 * The key method that users need to implement
-	 * 
-	 * @param msgIterator
-	 *            an iterator of incoming messages
-	 */
-	public abstract void compute(Iterator<M> msgIterator);
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     *            an iterator of incoming messages
+     */
+    public abstract void compute(Iterator<M> msgIterator);
 
-	/**
-	 * Add an edge for the vertex.
-	 * 
-	 * @param targetVertexId
-	 * @param edgeValue
-	 * @return successful or not
-	 */
-	public final boolean addEdge(I targetVertexId, E edgeValue) {
-		Edge<I, E> edge = this.allocateEdge();
-		edge.setDestVertexId(targetVertexId);
-		edge.setEdgeValue(edgeValue);
-		destEdgeList.add(edge);
-		updated = true;
-		return true;
-	}
+    /**
+     * Add an edge for the vertex.
+     * 
+     * @param targetVertexId
+     * @param edgeValue
+     * @return successful or not
+     */
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        updated = true;
+        return true;
+    }
 
-	/**
-	 * Initialize a new vertex
-	 * 
-	 * @param vertexId
-	 * @param vertexValue
-	 * @param edges
-	 * @param messages
-	 */
-	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
-			List<M> messages) {
-		if (vertexId != null) {
-			setVertexId(vertexId);
-		}
-		if (vertexValue != null) {
-			setVertexValue(vertexValue);
-		}
-		destEdgeList.clear();
-		if (edges != null && !edges.isEmpty()) {
-			for (Map.Entry<I, E> entry : edges.entrySet()) {
-				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
-						.getValue()));
-			}
-		}
-		if (messages != null && !messages.isEmpty()) {
-			msgList.addAll(messages);
-		}
-	}
+    /**
+     * Initialize a new vertex
+     * 
+     * @param vertexId
+     * @param vertexValue
+     * @param edges
+     * @param messages
+     */
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
 
-	/**
-	 * reset a vertex object: clear its internal states
-	 */
-	public void reset() {
-		usedEdge = 0;
-		usedMessage = 0;
-		usedValue = 0;
-		updated = false;
-	}
+    /**
+     * reset a vertex object: clear its internal states
+     */
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+        updated = false;
+    }
 
-	/**
-	 * Set the vertex id
-	 * 
-	 * @param vertexId
-	 */
-	public final void setVertexId(I vertexId) {
-		this.vertexId = vertexId;
-		delegate.setVertexId(vertexId);
-	}
+    /**
+     * Set the vertex id
+     * 
+     * @param vertexId
+     */
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
 
-	/**
-	 * Get the vertex id
-	 * 
-	 * @return vertex id
-	 */
-	public final I getVertexId() {
-		return vertexId;
-	}
+    /**
+     * Get the vertex id
+     * 
+     * @return vertex id
+     */
+    public final I getVertexId() {
+        return vertexId;
+    }
 
-	/**
-	 * Get the vertex value
-	 * 
-	 * @return the vertex value
-	 */
-	public final V getVertexValue() {
-		return vertexValue;
-	}
+    /**
+     * Get the vertex value
+     * 
+     * @return the vertex value
+     */
+    public final V getVertexValue() {
+        return vertexValue;
+    }
 
-	/**
-	 * Set the vertex value
-	 * 
-	 * @param vertexValue
-	 */
-	public final void setVertexValue(V vertexValue) {
-		this.vertexValue = vertexValue;
-		this.updated = true;
-	}
+    /**
+     * Set the vertex value
+     * 
+     * @param vertexValue
+     */
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
 
-	/***
-	 * Send a message to a specific vertex
-	 * 
-	 * @param id
-	 *            the receiver vertex id
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsg(I id, M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsg: Cannot send null message to " + id);
-		}
-		delegate.sendMsg(id, msg);
-		this.hasMessage = true;
-	}
+    /***
+     * Send a message to a specific vertex
+     * 
+     * @param id
+     *            the receiver vertex id
+     * @param msg
+     *            the message
+     */
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
 
-	/**
-	 * Send a message to all direct outgoing neighbors
-	 * 
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsgToAllEdges(M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsgToAllEdges: Cannot send null message to all edges");
-		}
-		for (Edge<I, E> edge : destEdgeList) {
-			sendMsg(edge.getDestVertexId(), msg);
-		}
-	}
+    /**
+     * Send a message to all direct outgoing neighbors
+     * 
+     * @param msg
+     *            the message
+     */
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
 
-	/**
-	 * Vote to halt. Once all vertex vote to halt and no more messages, a
-	 * Pregelix job will terminate.
-	 */
-	public final void voteToHalt() {
-		halt = true;
-		updated = true;
-	}
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     */
+    public final void voteToHalt() {
+        halt = true;
+        updated = true;
+    }
 
-	/**
-	 * @return the vertex is halted (true) or not (false)
-	 */
-	public final boolean isHalted() {
-		return halt;
-	}
+    /**
+     * Activate a halted vertex such that it is alive again.
+     */
+    public final void activate() {
+        halt = false;
+        updated = true;
+    }
 
-	@Override
-	final public void readFields(DataInput in) throws IOException {
-		reset();
-		if (vertexId == null)
-			vertexId = BspUtils.<I> createVertexIndex(getContext()
-					.getConfiguration());
-		vertexId.readFields(in);
-		delegate.setVertexId(vertexId);
-		boolean hasVertexValue = in.readBoolean();
+    /**
+     * @return the vertex is halted (true) or not (false)
+     */
+    public final boolean isHalted() {
+        return halt;
+    }
 
-		if (hasVertexValue) {
-			vertexValue = allocateValue();
-			vertexValue.readFields(in);
-			delegate.setVertex(this);
-		}
-		destEdgeList.clear();
-		long edgeMapSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < edgeMapSize; ++i) {
-			Edge<I, E> edge = allocateEdge();
-			edge.setConf(getContext().getConfiguration());
-			edge.readFields(in);
-			addEdge(edge);
-		}
-		msgList.clear();
-		long msgListSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < msgListSize; ++i) {
-			M msg = allocateMessage();
-			msg.readFields(in);
-			msgList.add(msg);
-		}
-		halt = in.readBoolean();
-		updated = false;
-		hasMessage = false;
-		createdNewLiveVertex = false;
-	}
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
 
-	@Override
-	public void write(DataOutput out) throws IOException {
-		vertexId.write(out);
-		out.writeBoolean(vertexValue != null);
-		if (vertexValue != null) {
-			vertexValue.write(out);
-		}
-		SerDeUtils.writeVLong(out, destEdgeList.size());
-		for (Edge<I, E> edge : destEdgeList) {
-			edge.write(out);
-		}
-		SerDeUtils.writeVLong(out, msgList.size());
-		for (M msg : msgList) {
-			msg.write(out);
-		}
-		out.writeBoolean(halt);
-	}
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+        createdNewLiveVertex = false;
+    }
 
-	/**
-	 * Get the list of incoming messages
-	 * 
-	 * @return the list of messages
-	 */
-	public List<M> getMsgList() {
-		return msgList;
-	}
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
 
-	/**
-	 * Get outgoing edge list
-	 * 
-	 * @return a list of outgoing edges
-	 */
-	public List<Edge<I, E>> getEdges() {
-		return this.destEdgeList;
-	}
+    /**
+     * Get the list of incoming messages
+     * 
+     * @return the list of messages
+     */
+    public List<M> getMsgList() {
+        return msgList;
+    }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public String toString() {
-		Collections.sort(destEdgeList);
-		StringBuffer edgeBuffer = new StringBuffer();
-		edgeBuffer.append("(");
-		for (Edge<I, E> edge : destEdgeList) {
-			edgeBuffer.append(edge.getDestVertexId()).append(",");
-		}
-		edgeBuffer.append(")");
-		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
-				+ ", edges=" + edgeBuffer + ")";
-	}
+    /**
+     * Get outgoing edge list
+     * 
+     * @return a list of outgoing edges
+     */
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
 
-	/**
-	 * Get the number of outgoing edges
-	 * 
-	 * @return the number of outging edges
-	 */
-	public int getNumOutEdges() {
-		return destEdgeList.size();
-	}
+    @Override
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputWriters(List<IFrameWriter> writers) {
-		delegate.setOutputWriters(writers);
-	}
+    /**
+     * Get the number of outgoing edges
+     * 
+     * @return the number of outging edges
+     */
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-		delegate.setOutputAppenders(appenders);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-		delegate.setOutputTupleBuilders(tbs);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void finishCompute() throws IOException {
-		delegate.finishCompute();
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasUpdate() {
-		return this.updated;
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasMessage() {
-		return this.hasMessage;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasUpdate() {
+        return this.updated;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean createdNewLiveVertex() {
-		return this.createdNewLiveVertex;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
 
-	/**
-	 * sort the edges
-	 */
-	@SuppressWarnings("unchecked")
-	public void sortEdges() {
-		updated = true;
-		Collections.sort(destEdgeList);
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean createdNewLiveVertex() {
+        return this.createdNewLiveVertex;
+    }
 
-	/**
-	 * Allocate a new edge from the edge pool
-	 */
-	private Edge<I, E> allocateEdge() {
-		Edge<I, E> edge;
-		if (usedEdge < edgePool.size()) {
-			edge = edgePool.get(usedEdge);
-			usedEdge++;
-		} else {
-			edge = new Edge<I, E>();
-			edgePool.add(edge);
-			usedEdge++;
-		}
-		return edge;
-	}
+    /**
+     * sort the edges
+     */
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        updated = true;
+        Collections.sort(destEdgeList);
+    }
 
-	/**
-	 * Allocate a new message from the message pool
-	 */
-	private M allocateMessage() {
-		M message;
-		if (usedMessage < msgPool.size()) {
-			message = msgPool.get(usedEdge);
-			usedMessage++;
-		} else {
-			message = BspUtils.<M> createMessageValue(getContext()
-					.getConfiguration());
-			msgPool.add(message);
-			usedMessage++;
-		}
-		return message;
-	}
+    /**
+     * Allocate a new edge from the edge pool
+     */
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
 
-	/**
-	 * Set the global superstep for all the vertices (internal use)
-	 * 
-	 * @param superstep
-	 *            New superstep
-	 */
-	public static final void setSuperstep(long superstep) {
-		Vertex.superstep = superstep;
-	}
+    /**
+     * Allocate a new message from the message pool
+     */
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
 
-	/**
-	 * Add an outgoing edge into the vertex
-	 * 
-	 * @param edge
-	 *            the edge to be added
-	 * @return true if the edge list changed as a result of this call
-	 */
-	public boolean addEdge(Edge<I, E> edge) {
-		edge.setConf(getContext().getConfiguration());
-		updated = true;
-		return destEdgeList.add(edge);
-	}
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static final void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
 
-	/**
-	 * remove an outgoing edge in the graph
-	 * 
-	 * @param edge
-	 *            the edge to be removed
-	 * @return true if the edge is in the edge list of the vertex
-	 */
-	public boolean removeEdge(Edge<I, E> edge) {
-		updated = true;
-		return destEdgeList.remove(edge);
-	}
+    /**
+     * Add an outgoing edge into the vertex
+     * 
+     * @param edge
+     *            the edge to be added
+     * @return true if the edge list changed as a result of this call
+     */
+    public boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        updated = true;
+        return destEdgeList.add(edge);
+    }
 
-	/**
-	 * Add a new vertex into the graph
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 * @param vertex
-	 *            the vertex
-	 */
-	public final void addVertex(I vertexId, Vertex vertex) {
-		createdNewLiveVertex |= !vertex.isHalted();
-		delegate.addVertex(vertexId, vertex);
-	}
+    /**
+     * remove an outgoing edge in the graph
+     * 
+     * @param edge
+     *            the edge to be removed
+     * @return true if the edge is in the edge list of the vertex
+     */
+    public boolean removeEdge(Edge<I, E> edge) {
+        updated = true;
+        return destEdgeList.remove(edge);
+    }
 
-	/**
-	 * Delete a vertex from id
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 */
-	public final void deleteVertex(I vertexId) {
-		delegate.deleteVertex(vertexId);
-	}
+    /**
+     * Add a new vertex into the graph
+     * 
+     * @param vertexId
+     *            the vertex id
+     * @param vertex
+     *            the vertex
+     */
+    public final void addVertex(I vertexId, Vertex vertex) {
+        createdNewLiveVertex |= !vertex.isHalted();
+        delegate.addVertex(vertexId, vertex);
+    }
 
-	/**
-	 * Allocate a vertex value from the object pool
-	 * 
-	 * @return a vertex value instance
-	 */
-	private V allocateValue() {
-		V value;
-		if (usedValue < valuePool.size()) {
-			value = valuePool.get(usedValue);
-			usedValue++;
-		} else {
-			value = BspUtils.<V> createVertexValue(getContext()
-					.getConfiguration());
-			valuePool.add(value);
-			usedValue++;
-		}
-		return value;
-	}
+    /**
+     * Delete a vertex from id
+     * 
+     * @param vertexId
+     *            the vertex id
+     */
+    public final void deleteVertex(I vertexId) {
+        delegate.deleteVertex(vertexId);
+    }
 
-	/**
-	 * Get the current global superstep number
-	 * 
-	 * @return the current superstep number
-	 */
-	public static final long getSuperstep() {
-		return superstep;
-	}
+    /**
+     * Allocate a vertex value from the object pool
+     * 
+     * @return a vertex value instance
+     */
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedValue);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
 
-	/**
-	 * Set the total number of vertices from the last superstep.
-	 * 
-	 * @param numVertices
-	 *            Aggregate vertices in the last superstep
-	 */
-	public static final void setNumVertices(long numVertices) {
-		Vertex.numVertices = numVertices;
-	}
+    /**
+     * Get the current global superstep number
+     * 
+     * @return the current superstep number
+     */
+    public static final long getSuperstep() {
+        return superstep;
+    }
 
-	/**
-	 * Get the number of vertexes in the graph
-	 * 
-	 * @return the number of vertexes in the graph
-	 */
-	public static final long getNumVertices() {
-		return numVertices;
-	}
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static final void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
 
-	/**
-	 * Set the total number of edges from the last superstep.
-	 * 
-	 * @param numEdges
-	 *            Aggregate edges in the last superstep
-	 */
-	public static void setNumEdges(long numEdges) {
-		Vertex.numEdges = numEdges;
-	}
+    /**
+     * Get the number of vertexes in the graph
+     * 
+     * @return the number of vertexes in the graph
+     */
+    public static final long getNumVertices() {
+        return numVertices;
+    }
 
-	/**
-	 * Get the number of edges from this graph
-	 * 
-	 * @return the number of edges in the graph
-	 */
-	public static final long getNumEdges() {
-		return numEdges;
-	}
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public static final TaskAttemptContext getContext() {
-		return context;
-	}
+    /**
+     * Get the number of edges from this graph
+     * 
+     * @return the number of edges in the graph
+     */
+    public static final long getNumEdges() {
+        return numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param context
-	 */
-	public static final void setContext(TaskAttemptContext context) {
-		Vertex.context = context;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public static final TaskAttemptContext getContext() {
+        return context;
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param context
+     */
+    public static final void setContext(TaskAttemptContext context) {
+        Vertex.context = context;
+    }
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..af95064 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -44,229 +44,249 @@
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
-    private static final long serialVersionUID = 1L;
-    private final IConfigurationFactory confFactory;
+	private static final long serialVersionUID = 1L;
+	private final IConfigurationFactory confFactory;
 
-    public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
-        this.confFactory = confFactory;
-    }
+	public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+		this.confFactory = confFactory;
+	}
 
-    @Override
-    public IUpdateFunction createFunction() {
-        return new IUpdateFunction() {
-            // for writing intermediate data
-            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
-            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
-            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+	@Override
+	public IUpdateFunction createFunction() {
+		return new IUpdateFunction() {
+			// for writing intermediate data
+			private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
+					1);
+			private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
+					1);
+			private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
-            // for writing out to message channel
-            private IFrameWriter writerMsg;
-            private FrameTupleAppender appenderMsg;
-            private ByteBuffer bufferMsg;
+			// for writing out to message channel
+			private IFrameWriter writerMsg;
+			private FrameTupleAppender appenderMsg;
+			private ByteBuffer bufferMsg;
 
-            // for writing out to alive message channel
-            private IFrameWriter writerAlive;
-            private FrameTupleAppender appenderAlive;
-            private ByteBuffer bufferAlive;
-            private boolean pushAlive;
+			// for writing out to alive message channel
+			private IFrameWriter writerAlive;
+			private FrameTupleAppender appenderAlive;
+			private ByteBuffer bufferAlive;
+			private boolean pushAlive;
 
-            // for writing out termination detection control channel
-            private IFrameWriter writerTerminate;
-            private FrameTupleAppender appenderTerminate;
-            private ByteBuffer bufferTerminate;
-            private boolean terminate = true;
+			// for writing out termination detection control channel
+			private IFrameWriter writerTerminate;
+			private FrameTupleAppender appenderTerminate;
+			private ByteBuffer bufferTerminate;
+			private boolean terminate = true;
 
-            // for writing out termination detection control channel
-            private IFrameWriter writerGlobalAggregate;
-            private FrameTupleAppender appenderGlobalAggregate;
-            private ByteBuffer bufferGlobalAggregate;
-            private GlobalAggregator aggregator;
+			// for writing out termination detection control channel
+			private IFrameWriter writerGlobalAggregate;
+			private FrameTupleAppender appenderGlobalAggregate;
+			private ByteBuffer bufferGlobalAggregate;
+			private GlobalAggregator aggregator;
 
-            // for writing out to insert vertex channel
-            private IFrameWriter writerInsert;
-            private FrameTupleAppender appenderInsert;
-            private ByteBuffer bufferInsert;
+			// for writing out to insert vertex channel
+			private IFrameWriter writerInsert;
+			private FrameTupleAppender appenderInsert;
+			private ByteBuffer bufferInsert;
 
-            // for writing out to delete vertex channel
-            private IFrameWriter writerDelete;
-            private FrameTupleAppender appenderDelete;
-            private ByteBuffer bufferDelete;
+			// for writing out to delete vertex channel
+			private IFrameWriter writerDelete;
+			private FrameTupleAppender appenderDelete;
+			private ByteBuffer bufferDelete;
 
-            private Vertex vertex;
-            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
-            private DataOutput output = new DataOutputStream(bbos);
+			private Vertex vertex;
+			private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+			private DataOutput output = new DataOutputStream(bbos);
 
-            private ArrayIterator msgIterator = new ArrayIterator();
-            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
-            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
-            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
-            private Configuration conf;
-            private boolean dynamicStateLength;
+			private ArrayIterator msgIterator = new ArrayIterator();
+			private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+			private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+			private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+			private Configuration conf;
+			private boolean dynamicStateLength;
 
-            @Override
-            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
-                    throws HyracksDataException {
-                this.conf = confFactory.createConfiguration();
-                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
-                this.aggregator = BspUtils.createGlobalAggregator(conf);
-                this.aggregator.init();
+			@Override
+			public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
+					IFrameWriter... writers) throws HyracksDataException {
+				this.conf = confFactory.createConfiguration();
+				this.dynamicStateLength = BspUtils
+						.getDynamicVertexValueSize(conf);
+				this.aggregator = BspUtils.createGlobalAggregator(conf);
+				this.aggregator.init();
 
-                this.writerMsg = writers[0];
-                this.bufferMsg = ctx.allocateFrame();
-                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderMsg.reset(bufferMsg, true);
-                this.writers.add(writerMsg);
-                this.appenders.add(appenderMsg);
+				this.writerMsg = writers[0];
+				this.bufferMsg = ctx.allocateFrame();
+				this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderMsg.reset(bufferMsg, true);
+				this.writers.add(writerMsg);
+				this.appenders.add(appenderMsg);
 
-                this.writerTerminate = writers[1];
-                this.bufferTerminate = ctx.allocateFrame();
-                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderTerminate.reset(bufferTerminate, true);
+				this.writerTerminate = writers[1];
+				this.bufferTerminate = ctx.allocateFrame();
+				this.appenderTerminate = new FrameTupleAppender(
+						ctx.getFrameSize());
+				this.appenderTerminate.reset(bufferTerminate, true);
 
-                this.writerGlobalAggregate = writers[2];
-                this.bufferGlobalAggregate = ctx.allocateFrame();
-                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+				this.writerGlobalAggregate = writers[2];
+				this.bufferGlobalAggregate = ctx.allocateFrame();
+				this.appenderGlobalAggregate = new FrameTupleAppender(
+						ctx.getFrameSize());
+				this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                this.writerInsert = writers[3];
-                this.bufferInsert = ctx.allocateFrame();
-                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderInsert.reset(bufferInsert, true);
-                this.writers.add(writerInsert);
-                this.appenders.add(appenderInsert);
+				this.writerInsert = writers[3];
+				this.bufferInsert = ctx.allocateFrame();
+				this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderInsert.reset(bufferInsert, true);
+				this.writers.add(writerInsert);
+				this.appenders.add(appenderInsert);
 
-                this.writerDelete = writers[4];
-                this.bufferDelete = ctx.allocateFrame();
-                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderDelete.reset(bufferDelete, true);
-                this.writers.add(writerDelete);
-                this.appenders.add(appenderDelete);
+				this.writerDelete = writers[4];
+				this.bufferDelete = ctx.allocateFrame();
+				this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderDelete.reset(bufferDelete, true);
+				this.writers.add(writerDelete);
+				this.appenders.add(appenderDelete);
 
-                if (writers.length > 5) {
-                    this.writerAlive = writers[5];
-                    this.bufferAlive = ctx.allocateFrame();
-                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
-                    this.appenderAlive.reset(bufferAlive, true);
-                    this.pushAlive = true;
-                    this.writers.add(writerAlive);
-                    this.appenders.add(appenderAlive);
-                }
+				if (writers.length > 5) {
+					this.writerAlive = writers[5];
+					this.bufferAlive = ctx.allocateFrame();
+					this.appenderAlive = new FrameTupleAppender(
+							ctx.getFrameSize());
+					this.appenderAlive.reset(bufferAlive, true);
+					this.pushAlive = true;
+					this.writers.add(writerAlive);
+					this.appenders.add(appenderAlive);
+				}
 
-                tbs.add(tbMsg);
-                tbs.add(tbInsert);
-                tbs.add(tbDelete);
-                tbs.add(tbAlive);
-            }
+				tbs.add(tbMsg);
+				tbs.add(tbInsert);
+				tbs.add(tbDelete);
+				tbs.add(tbAlive);
+			}
 
-            @Override
-            public void process(Object[] tuple) throws HyracksDataException {
-                // vertex Id, msg content List, vertex Id, vertex
-                tbMsg.reset();
-                tbAlive.reset();
+			@Override
+			public void process(Object[] tuple) throws HyracksDataException {
+				// vertex Id, msg content List, vertex Id, vertex
+				tbMsg.reset();
+				tbAlive.reset();
 
-                vertex = (Vertex) tuple[3];
-                vertex.setOutputWriters(writers);
-                vertex.setOutputAppenders(appenders);
-                vertex.setOutputTupleBuilders(tbs);
+				vertex = (Vertex) tuple[3];
+				vertex.setOutputWriters(writers);
+				vertex.setOutputAppenders(appenders);
+				vertex.setOutputTupleBuilders(tbs);
 
-                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
-                msgContentList.reset(msgIterator);
+				ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+				msgContentList.reset(msgIterator);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
-                    return;
+				if (!msgIterator.hasNext() && vertex.isHalted()) {
+					return;
+				}
+				if (vertex.isHalted()) {
+					vertex.activate();
+				}
 
-                try {
-                    vertex.compute(msgIterator);
-                    vertex.finishCompute();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
+				try {
+					vertex.compute(msgIterator);
+					vertex.finishCompute();
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
 
-                /**
-                 * this partition should not terminate
-                 */
-                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
-                    terminate = false;
+				/**
+				 * this partition should not terminate
+				 */
+				if (terminate
+						&& (!vertex.isHalted() || vertex.hasMessage() || vertex
+								.createdNewLiveVertex()))
+					terminate = false;
 
-                aggregator.step(vertex);
-            }
+				aggregator.step(vertex);
+			}
 
-            @Override
-            public void close() throws HyracksDataException {
-                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
-                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
-                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+			@Override
+			public void close() throws HyracksDataException {
+				FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+				FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+				FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
 
-                if (pushAlive)
-                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
-                if (!terminate) {
-                    writeOutTerminationState();
-                }
+				if (pushAlive)
+					FrameTupleUtils
+							.flushTuplesFinal(appenderAlive, writerAlive);
+				if (!terminate) {
+					writeOutTerminationState();
+				}
 
-                /** write out global aggregate value */
-                writeOutGlobalAggregate();
-            }
+				/** write out global aggregate value */
+				writeOutGlobalAggregate();
+			}
 
-            private void writeOutGlobalAggregate() throws HyracksDataException {
-                try {
-                    /**
-                     * get partial aggregate result and flush to the final
-                     * aggregator
-                     */
-                    Writable agg = aggregator.finishPartial();
-                    agg.write(tbGlobalAggregate.getDataOutput());
-                    tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
-                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+			private void writeOutGlobalAggregate() throws HyracksDataException {
+				try {
+					/**
+					 * get partial aggregate result and flush to the final
+					 * aggregator
+					 */
+					Writable agg = aggregator.finishPartial();
+					agg.write(tbGlobalAggregate.getDataOutput());
+					tbGlobalAggregate.addFieldEndOffset();
+					appenderGlobalAggregate.append(
+							tbGlobalAggregate.getFieldEndOffsets(),
+							tbGlobalAggregate.getByteArray(), 0,
+							tbGlobalAggregate.getSize());
+					FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
+							writerGlobalAggregate);
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
 
-            private void writeOutTerminationState() throws HyracksDataException {
-                try {
-                    tbTerminate.getDataOutput().writeLong(0);
-                    tbTerminate.addFieldEndOffset();
-                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
-                            tbTerminate.getSize());
-                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+			private void writeOutTerminationState() throws HyracksDataException {
+				try {
+					tbTerminate.getDataOutput().writeLong(0);
+					tbTerminate.addFieldEndOffset();
+					appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
+							tbTerminate.getByteArray(), 0,
+							tbTerminate.getSize());
+					FrameTupleUtils.flushTuplesFinal(appenderTerminate,
+							writerTerminate);
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
 
-            @Override
-            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
-                try {
-                    if (vertex != null && vertex.hasUpdate()) {
-                        if (!dynamicStateLength) {
-                            // in-place update
-                            int fieldCount = tupleRef.getFieldCount();
-                            for (int i = 1; i < fieldCount; i++) {
-                                byte[] data = tupleRef.getFieldData(i);
-                                int offset = tupleRef.getFieldStart(i);
-                                bbos.setByteArray(data, offset);
-                                vertex.write(output);
-                            }
-                        } else {
-                            // write the vertex id
-                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
-                            vertex.getVertexId().write(tbOutput);
-                            cloneUpdateTb.addFieldEndOffset();
+			@Override
+			public void update(ITupleReference tupleRef,
+					ArrayTupleBuilder cloneUpdateTb)
+					throws HyracksDataException {
+				try {
+					if (vertex != null && vertex.hasUpdate()) {
+						if (!dynamicStateLength) {
+							// in-place update
+							int fieldCount = tupleRef.getFieldCount();
+							for (int i = 1; i < fieldCount; i++) {
+								byte[] data = tupleRef.getFieldData(i);
+								int offset = tupleRef.getFieldStart(i);
+								bbos.setByteArray(data, offset);
+								vertex.write(output);
+							}
+						} else {
+							// write the vertex id
+							DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+							vertex.getVertexId().write(tbOutput);
+							cloneUpdateTb.addFieldEndOffset();
 
-                            // write the vertex value
-                            vertex.write(tbOutput);
-                            cloneUpdateTb.addFieldEndOffset();
-                        }
-                    }
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        };
-    }
+							// write the vertex value
+							vertex.write(tbOutput);
+							cloneUpdateTb.addFieldEndOffset();
+						}
+					}
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
+		};
+	}
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 0cf64a0..a241c9c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -43,234 +43,255 @@
 import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
-    private static final long serialVersionUID = 1L;
-    private final IConfigurationFactory confFactory;
+public class StartComputeUpdateFunctionFactory implements
+		IUpdateFunctionFactory {
+	private static final long serialVersionUID = 1L;
+	private final IConfigurationFactory confFactory;
 
-    public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
-        this.confFactory = confFactory;
-    }
+	public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+		this.confFactory = confFactory;
+	}
 
-    @Override
-    public IUpdateFunction createFunction() {
-        return new IUpdateFunction() {
-            // for writing intermediate data
-            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
-            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
-            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
-            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+	@Override
+	public IUpdateFunction createFunction() {
+		return new IUpdateFunction() {
+			// for writing intermediate data
+			private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
+					1);
+			private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
+					1);
+			private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+			private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
-            // for writing out to message channel
-            private IFrameWriter writerMsg;
-            private FrameTupleAppender appenderMsg;
-            private ByteBuffer bufferMsg;
+			// for writing out to message channel
+			private IFrameWriter writerMsg;
+			private FrameTupleAppender appenderMsg;
+			private ByteBuffer bufferMsg;
 
-            // for writing out to alive message channel
-            private IFrameWriter writerAlive;
-            private FrameTupleAppender appenderAlive;
-            private ByteBuffer bufferAlive;
-            private boolean pushAlive;
+			// for writing out to alive message channel
+			private IFrameWriter writerAlive;
+			private FrameTupleAppender appenderAlive;
+			private ByteBuffer bufferAlive;
+			private boolean pushAlive;
 
-            // for writing out termination detection control channel
-            private IFrameWriter writerGlobalAggregate;
-            private FrameTupleAppender appenderGlobalAggregate;
-            private ByteBuffer bufferGlobalAggregate;
-            private GlobalAggregator aggregator;
+			// for writing out termination detection control channel
+			private IFrameWriter writerGlobalAggregate;
+			private FrameTupleAppender appenderGlobalAggregate;
+			private ByteBuffer bufferGlobalAggregate;
+			private GlobalAggregator aggregator;
 
-            // for writing out the global aggregate
-            private IFrameWriter writerTerminate;
-            private FrameTupleAppender appenderTerminate;
-            private ByteBuffer bufferTerminate;
-            private boolean terminate = true;
+			// for writing out the global aggregate
+			private IFrameWriter writerTerminate;
+			private FrameTupleAppender appenderTerminate;
+			private ByteBuffer bufferTerminate;
+			private boolean terminate = true;
 
-            // for writing out to insert vertex channel
-            private IFrameWriter writerInsert;
-            private FrameTupleAppender appenderInsert;
-            private ByteBuffer bufferInsert;
+			// for writing out to insert vertex channel
+			private IFrameWriter writerInsert;
+			private FrameTupleAppender appenderInsert;
+			private ByteBuffer bufferInsert;
 
-            // for writing out to delete vertex channel
-            private IFrameWriter writerDelete;
-            private FrameTupleAppender appenderDelete;
-            private ByteBuffer bufferDelete;
+			// for writing out to delete vertex channel
+			private IFrameWriter writerDelete;
+			private FrameTupleAppender appenderDelete;
+			private ByteBuffer bufferDelete;
 
-            // dummy empty msgList
-            private MsgList msgList = new MsgList();
-            private ArrayIterator msgIterator = new ArrayIterator();
+			// dummy empty msgList
+			private MsgList msgList = new MsgList();
+			private ArrayIterator msgIterator = new ArrayIterator();
 
-            private Vertex vertex;
-            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
-            private DataOutput output = new DataOutputStream(bbos);
+			private Vertex vertex;
+			private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+			private DataOutput output = new DataOutputStream(bbos);
 
-            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
-            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
-            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
-            private Configuration conf;
-            private boolean dynamicStateLength;
+			private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+			private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+			private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+			private Configuration conf;
+			private boolean dynamicStateLength;
 
-            @Override
-            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
-                    throws HyracksDataException {
-                this.conf = confFactory.createConfiguration();
-                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
-                this.aggregator = BspUtils.createGlobalAggregator(conf);
-                this.aggregator.init();
+			@Override
+			public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
+					IFrameWriter... writers) throws HyracksDataException {
+				this.conf = confFactory.createConfiguration();
+				this.dynamicStateLength = BspUtils
+						.getDynamicVertexValueSize(conf);
+				this.aggregator = BspUtils.createGlobalAggregator(conf);
+				this.aggregator.init();
 
-                this.writerMsg = writers[0];
-                this.bufferMsg = ctx.allocateFrame();
-                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderMsg.reset(bufferMsg, true);
-                this.writers.add(writerMsg);
-                this.appenders.add(appenderMsg);
+				this.writerMsg = writers[0];
+				this.bufferMsg = ctx.allocateFrame();
+				this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderMsg.reset(bufferMsg, true);
+				this.writers.add(writerMsg);
+				this.appenders.add(appenderMsg);
 
-                this.writerTerminate = writers[1];
-                this.bufferTerminate = ctx.allocateFrame();
-                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderTerminate.reset(bufferTerminate, true);
+				this.writerTerminate = writers[1];
+				this.bufferTerminate = ctx.allocateFrame();
+				this.appenderTerminate = new FrameTupleAppender(
+						ctx.getFrameSize());
+				this.appenderTerminate.reset(bufferTerminate, true);
 
-                this.writerGlobalAggregate = writers[2];
-                this.bufferGlobalAggregate = ctx.allocateFrame();
-                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+				this.writerGlobalAggregate = writers[2];
+				this.bufferGlobalAggregate = ctx.allocateFrame();
+				this.appenderGlobalAggregate = new FrameTupleAppender(
+						ctx.getFrameSize());
+				this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                this.writerInsert = writers[3];
-                this.bufferInsert = ctx.allocateFrame();
-                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderInsert.reset(bufferInsert, true);
-                this.writers.add(writerInsert);
-                this.appenders.add(appenderInsert);
+				this.writerInsert = writers[3];
+				this.bufferInsert = ctx.allocateFrame();
+				this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderInsert.reset(bufferInsert, true);
+				this.writers.add(writerInsert);
+				this.appenders.add(appenderInsert);
 
-                this.writerDelete = writers[4];
-                this.bufferDelete = ctx.allocateFrame();
-                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
-                this.appenderDelete.reset(bufferDelete, true);
-                this.writers.add(writerDelete);
-                this.appenders.add(appenderDelete);
+				this.writerDelete = writers[4];
+				this.bufferDelete = ctx.allocateFrame();
+				this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+				this.appenderDelete.reset(bufferDelete, true);
+				this.writers.add(writerDelete);
+				this.appenders.add(appenderDelete);
 
-                if (writers.length > 5) {
-                    this.writerAlive = writers[5];
-                    this.bufferAlive = ctx.allocateFrame();
-                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
-                    this.appenderAlive.reset(bufferAlive, true);
-                    this.pushAlive = true;
-                    this.writers.add(writerAlive);
-                    this.appenders.add(appenderAlive);
-                }
-                msgList.reset(msgIterator);
+				if (writers.length > 5) {
+					this.writerAlive = writers[5];
+					this.bufferAlive = ctx.allocateFrame();
+					this.appenderAlive = new FrameTupleAppender(
+							ctx.getFrameSize());
+					this.appenderAlive.reset(bufferAlive, true);
+					this.pushAlive = true;
+					this.writers.add(writerAlive);
+					this.appenders.add(appenderAlive);
+				}
+				msgList.reset(msgIterator);
 
-                tbs.add(tbMsg);
-                tbs.add(tbInsert);
-                tbs.add(tbDelete);
-                tbs.add(tbAlive);
-            }
+				tbs.add(tbMsg);
+				tbs.add(tbInsert);
+				tbs.add(tbDelete);
+				tbs.add(tbAlive);
+			}
 
-            @Override
-            public void process(Object[] tuple) throws HyracksDataException {
-                // vertex Id, vertex
-                tbMsg.reset();
-                tbAlive.reset();
+			@Override
+			public void process(Object[] tuple) throws HyracksDataException {
+				// vertex Id, vertex
+				tbMsg.reset();
+				tbAlive.reset();
 
-                vertex = (Vertex) tuple[1];
-                vertex.setOutputWriters(writers);
-                vertex.setOutputAppenders(appenders);
-                vertex.setOutputTupleBuilders(tbs);
+				vertex = (Vertex) tuple[1];
+				vertex.setOutputWriters(writers);
+				vertex.setOutputAppenders(appenders);
+				vertex.setOutputTupleBuilders(tbs);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
-                    return;
+				if (!msgIterator.hasNext() && vertex.isHalted()) {
+					return;
+				}
+				if (vertex.isHalted()) {
+					vertex.activate();
+				}
 
-                try {
-                    vertex.compute(msgIterator);
-                    vertex.finishCompute();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
+				try {
+					vertex.compute(msgIterator);
+					vertex.finishCompute();
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
 
-                /**
-                 * this partition should not terminate
-                 */
-                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
-                    terminate = false;
+				/**
+				 * this partition should not terminate
+				 */
+				if (terminate
+						&& (!vertex.isHalted() || vertex.hasMessage() || vertex
+								.createdNewLiveVertex()))
+					terminate = false;
 
-                /**
-                 * call the global aggregator
-                 */
-                aggregator.step(vertex);
-            }
+				/**
+				 * call the global aggregator
+				 */
+				aggregator.step(vertex);
+			}
 
-            @Override
-            public void close() throws HyracksDataException {
-                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
-                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
-                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+			@Override
+			public void close() throws HyracksDataException {
+				FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+				FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+				FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
 
-                if (pushAlive)
-                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
-                if (!terminate) {
-                    writeOutTerminationState();
-                }
+				if (pushAlive)
+					FrameTupleUtils
+							.flushTuplesFinal(appenderAlive, writerAlive);
+				if (!terminate) {
+					writeOutTerminationState();
+				}
 
-                /** write out global aggregate value */
-                writeOutGlobalAggregate();
-            }
+				/** write out global aggregate value */
+				writeOutGlobalAggregate();
+			}
 
-            private void writeOutGlobalAggregate() throws HyracksDataException {
-                try {
-                    /**
-                     * get partial aggregate result and flush to the final
-                     * aggregator
-                     */
-                    Writable agg = aggregator.finishPartial();
-                    agg.write(tbGlobalAggregate.getDataOutput());
-                    tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
-                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+			private void writeOutGlobalAggregate() throws HyracksDataException {
+				try {
+					/**
+					 * get partial aggregate result and flush to the final
+					 * aggregator
+					 */
+					Writable agg = aggregator.finishPartial();
+					agg.write(tbGlobalAggregate.getDataOutput());
+					tbGlobalAggregate.addFieldEndOffset();
+					appenderGlobalAggregate.append(
+							tbGlobalAggregate.getFieldEndOffsets(),
+							tbGlobalAggregate.getByteArray(), 0,
+							tbGlobalAggregate.getSize());
+					FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
+							writerGlobalAggregate);
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
 
-            private void writeOutTerminationState() throws HyracksDataException {
-                try {
-                    tbTerminate.getDataOutput().writeLong(0);
-                    tbTerminate.addFieldEndOffset();
-                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
-                            tbTerminate.getSize());
-                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+			private void writeOutTerminationState() throws HyracksDataException {
+				try {
+					tbTerminate.getDataOutput().writeLong(0);
+					tbTerminate.addFieldEndOffset();
+					appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
+							tbTerminate.getByteArray(), 0,
+							tbTerminate.getSize());
+					FrameTupleUtils.flushTuplesFinal(appenderTerminate,
+							writerTerminate);
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
 
-            @Override
-            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
-                try {
-                    if (vertex != null && vertex.hasUpdate()) {
-                        if (!dynamicStateLength) {
-                            // in-place update
-                            int fieldCount = tupleRef.getFieldCount();
-                            for (int i = 1; i < fieldCount; i++) {
-                                byte[] data = tupleRef.getFieldData(i);
-                                int offset = tupleRef.getFieldStart(i);
-                                bbos.setByteArray(data, offset);
-                                vertex.write(output);
-                            }
-                        } else {
-                            // write the vertex id
-                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
-                            vertex.getVertexId().write(tbOutput);
-                            cloneUpdateTb.addFieldEndOffset();
+			@Override
+			public void update(ITupleReference tupleRef,
+					ArrayTupleBuilder cloneUpdateTb)
+					throws HyracksDataException {
+				try {
+					if (vertex != null && vertex.hasUpdate()) {
+						if (!dynamicStateLength) {
+							// in-place update
+							int fieldCount = tupleRef.getFieldCount();
+							for (int i = 1; i < fieldCount; i++) {
+								byte[] data = tupleRef.getFieldData(i);
+								int offset = tupleRef.getFieldStart(i);
+								bbos.setByteArray(data, offset);
+								vertex.write(output);
+							}
+						} else {
+							// write the vertex id
+							DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+							vertex.getVertexId().write(tbOutput);
+							cloneUpdateTb.addFieldEndOffset();
 
-                            // write the vertex value
-                            vertex.write(tbOutput);
-                            cloneUpdateTb.addFieldEndOffset();
-                        }
-                    }
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        };
-    }
+							// write the vertex value
+							vertex.write(tbOutput);
+							cloneUpdateTb.addFieldEndOffset();
+						}
+					}
+				} catch (IOException e) {
+					throw new HyracksDataException(e);
+				}
+			}
+		};
+	}
 }