minor refactoring and more comments

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2088 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 329e48a..cb27249 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -7,10 +7,27 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
+/**
+ * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
+ * </p>
+ * The global aggregation of vertices in a distributed cluster include two phase:
+ * 1. a local phase which aggregates vertice sent from a single machine and produces
+ * the partially aggregated state;
+ * 2. a final phase which aggregates all partially aggregated states
+ * 
+ * @param <I extends Writable> vertex identifier type
+ * @param <E extends Writable> vertex value type
+ * @param <E extends Writable> edge type
+ * @param <M extends Writable> message type
+ * @param <P extends Writable>
+ *        the type of the partial aggregate state
+ * @param <F extends Writable> the type of the final aggregate value
+ */
+
 @SuppressWarnings("rawtypes")
 public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
     /**
-     * initialize combiner
+     * initialize aggregator
      */
     public abstract void init();
 
@@ -26,18 +43,18 @@
     /**
      * step through all intermediate aggregate result
      * 
-     * @param partialResult partial aggregate value
+     * @param partialResult
+     *            partial aggregate value
      */
     public abstract void step(P partialResult);
 
-    
     /**
      * finish partial aggregate
      * 
      * @return the final aggregate value
      */
     public abstract P finishPartial();
-    
+
     /**
      * finish final aggregate
      * 
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index fd540fd..e4f8ef9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -15,24 +15,35 @@
 
 package edu.uci.ics.pregelix.api.graph;
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * interface to implement for combining of messages sent to the same vertex.
+ * This is the abstract class to implement for combining of messages that are sent to the same vertex.
+ * </p>
+ * This is similar to the concept of Combiner in Hadoop. The combining of messages in a distributed
+ * cluster include two phase:
+ * 1. a local phase which combines messages sent from a single machine and produces
+ * the partially combined message;
+ * 2. a final phase which combines messages at each receiver machine after the repartitioning (shuffling)
+ * and produces the final combined message
  * 
- * @param <I extends Writable> index
- * @param <M extends Writable> message data
+ * @param <I extends Writable> vertex identifier
+ * @param <M extends Writable> message body type
+ * @param <P extends Writable>
+ *        the type of the partially combined messages
  */
 @SuppressWarnings("rawtypes")
 public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
 
     /**
      * initialize combiner
+     * 
+     * @param providedMsgList
+     *            the provided msg list for user implementation to update, which *should* be returned
+     *            by the finishFinal() method
      */
     public abstract void init(MsgList providedMsgList);
 
@@ -40,29 +51,35 @@
      * step call for local combiner
      * 
      * @param vertexIndex
+     *            the receiver vertex identifier
      * @param msg
-     * @throws IOException
+     *            a single message body
+     * @throws HyracksDataException
      */
-    public abstract void step(I vertexIndex, M msg) throws HyracksDataException;
+    public abstract void stepPartial(I vertexIndex, M msg) throws HyracksDataException;
 
     /**
      * step call for global combiner
      * 
      * @param vertexIndex
-     * @param msg
-     * @throws IOException
+     *            the receiver vertex identifier
+     * @param partialAggregate
+     *            the partial aggregate value
+     * @throws HyracksDataException
      */
-    public abstract void step(P partialAggregate) throws HyracksDataException;
+    public abstract void stepFinal(I vertexIndex, P partialAggregate) throws HyracksDataException;
 
     /**
      * finish partial combiner
+     * 
+     * @return the intermediate combined message of type P
      */
     public abstract P finishPartial();
 
     /**
      * finish final combiner
      * 
-     * @return Message
+     * @return the final message List
      */
     public abstract MsgList<M> finishFinal();
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b0cd533..6856e9a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -31,16 +31,15 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.pregelix.api.delegate.VertexDelegate;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
  * User applications should all inherit {@link Vertex}, and implement their own
- * compute method.
+ * *compute* method.
  * 
  * @param <I>
- *            Vertex id type
+ *            Vertex identifier type
  * @param <V>
  *            Vertex value type
  * @param <E>
@@ -50,393 +49,384 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-		implements Writable {
-	private static long superstep = 0;
-	/** Class-wide number of vertices */
-	private static long numVertices = -1;
-	/** Class-wide number of edges */
-	private static long numEdges = -1;
-	/** Vertex id */
-	private I vertexId = null;
-	/** Vertex value */
-	private V vertexValue = null;
-	/** Map of destination vertices and their edge values */
-	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-	/** If true, do not do anymore computation on this vertex. */
-	boolean halt = false;
-	/** List of incoming messages from the previous superstep */
-	private final List<M> msgList = new ArrayList<M>();
-	/** map context */
-	private static Mapper.Context context = null;
-	/** a delegate for hyracks stuff */
-	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
-			this);
-	/** this vertex is updated or not */
-	private boolean updated = false;
-	/** has outgoing messages */
-	private boolean hasMessage = false;
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static Mapper.Context context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
 
-	/**
-	 * use object pool for re-using objects
-	 */
-	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-	private List<M> msgPool = new ArrayList<M>();
-	private List<V> valuePool = new ArrayList<V>();
-	private int usedEdge = 0;
-	private int usedMessage = 0;
-	private int usedValue = 0;
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
 
-	/**
-	 * The key method that users need to implement
-	 * 
-	 * @param msgIterator
-	 *            an iterator of incoming messages
-	 */
-	public abstract void compute(Iterator<M> msgIterator);
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     *            an iterator of incoming messages
+     */
+    public abstract void compute(Iterator<M> msgIterator);
 
-	/**
-	 * Add an edge for the vertex.
-	 * 
-	 * @param targetVertexId
-	 * @param edgeValue
-	 * @return successful or not
-	 */
-	public final boolean addEdge(I targetVertexId, E edgeValue) {
-		Edge<I, E> edge = this.allocateEdge();
-		edge.setDestVertexId(targetVertexId);
-		edge.setEdgeValue(edgeValue);
-		destEdgeList.add(edge);
-		return true;
-	}
+    /**
+     * Add an edge for the vertex.
+     * 
+     * @param targetVertexId
+     * @param edgeValue
+     * @return successful or not
+     */
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        return true;
+    }
 
-	/**
-	 * Initialize a new vertex
-	 * 
-	 * @param vertexId
-	 * @param vertexValue
-	 * @param edges
-	 * @param messages
-	 */
-	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
-			List<M> messages) {
-		if (vertexId != null) {
-			setVertexId(vertexId);
-		}
-		if (vertexValue != null) {
-			setVertexValue(vertexValue);
-		}
-		destEdgeList.clear();
-		if (edges != null && !edges.isEmpty()) {
-			for (Map.Entry<I, E> entry : edges.entrySet()) {
-				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
-						.getValue()));
-			}
-		}
-		if (messages != null && !messages.isEmpty()) {
-			msgList.addAll(messages);
-		}
-	}
+    /**
+     * Initialize a new vertex
+     * 
+     * @param vertexId
+     * @param vertexValue
+     * @param edges
+     * @param messages
+     */
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
 
-	/**
-	 * reset a vertex object: clear its internal states
-	 */
-	public void reset() {
-		usedEdge = 0;
-		usedMessage = 0;
-		usedValue = 0;
-	}
+    /**
+     * reset a vertex object: clear its internal states
+     */
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+    }
 
-	private Edge<I, E> allocateEdge() {
-		Edge<I, E> edge;
-		if (usedEdge < edgePool.size()) {
-			edge = edgePool.get(usedEdge);
-			usedEdge++;
-		} else {
-			edge = new Edge<I, E>();
-			edgePool.add(edge);
-			usedEdge++;
-		}
-		return edge;
-	}
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
 
-	private M allocateMessage() {
-		M message;
-		if (usedMessage < msgPool.size()) {
-			message = msgPool.get(usedEdge);
-			usedMessage++;
-		} else {
-			message = BspUtils.<M> createMessageValue(getContext()
-					.getConfiguration());
-			msgPool.add(message);
-			usedMessage++;
-		}
-		return message;
-	}
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
 
-	private V allocateValue() {
-		V value;
-		if (usedValue < valuePool.size()) {
-			value = valuePool.get(usedEdge);
-			usedValue++;
-		} else {
-			value = BspUtils.<V> createVertexValue(getContext()
-					.getConfiguration());
-			valuePool.add(value);
-			usedValue++;
-		}
-		return value;
-	}
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedEdge);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
 
-	/**
-	 * Set the vertex id
-	 * 
-	 * @param vertexId
-	 */
-	public final void setVertexId(I vertexId) {
-		this.vertexId = vertexId;
-		delegate.setVertexId(vertexId);
-	}
+    /**
+     * Set the vertex id
+     * 
+     * @param vertexId
+     */
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
 
-	/**
-	 * Get the vertex id
-	 * 
-	 * @return vertex id
-	 */
-	public final I getVertexId() {
-		return vertexId;
-	}
+    /**
+     * Get the vertex id
+     * 
+     * @return vertex id
+     */
+    public final I getVertexId() {
+        return vertexId;
+    }
 
-	/**
-	 * Set the global superstep for all the vertices (internal use)
-	 * 
-	 * @param superstep
-	 *            New superstep
-	 */
-	public static void setSuperstep(long superstep) {
-		Vertex.superstep = superstep;
-	}
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
 
-	public static long getCurrentSuperstep() {
-		return superstep;
-	}
+    public static long getCurrentSuperstep() {
+        return superstep;
+    }
 
-	public final long getSuperstep() {
-		return superstep;
-	}
+    public final long getSuperstep() {
+        return superstep;
+    }
 
-	public final V getVertexValue() {
-		return vertexValue;
-	}
+    public final V getVertexValue() {
+        return vertexValue;
+    }
 
-	public final void setVertexValue(V vertexValue) {
-		this.vertexValue = vertexValue;
-		this.updated = true;
-	}
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
 
-	/**
-	 * Set the total number of vertices from the last superstep.
-	 * 
-	 * @param numVertices
-	 *            Aggregate vertices in the last superstep
-	 */
-	public static void setNumVertices(long numVertices) {
-		Vertex.numVertices = numVertices;
-	}
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
 
-	public final long getNumVertices() {
-		return numVertices;
-	}
+    public final long getNumVertices() {
+        return numVertices;
+    }
 
-	/**
-	 * Set the total number of edges from the last superstep.
-	 * 
-	 * @param numEdges
-	 *            Aggregate edges in the last superstep
-	 */
-	public static void setNumEdges(long numEdges) {
-		Vertex.numEdges = numEdges;
-	}
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
 
-	public final long getNumEdges() {
-		return numEdges;
-	}
+    public final long getNumEdges() {
+        return numEdges;
+    }
 
-	/***
-	 * Send a message to a specific vertex
-	 * 
-	 * @param id
-	 *            the receiver vertex id
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsg(I id, M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsg: Cannot send null message to " + id);
-		}
-		delegate.sendMsg(id, msg);
-		this.hasMessage = true;
-	}
+    /***
+     * Send a message to a specific vertex
+     * 
+     * @param id
+     *            the receiver vertex id
+     * @param msg
+     *            the message
+     */
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
 
-	/**
-	 * Send a message to all direct outgoing neighbors
-	 * 
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsgToAllEdges(M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsgToAllEdges: Cannot send null message to all edges");
-		}
-		for (Edge<I, E> edge : destEdgeList) {
-			sendMsg(edge.getDestVertexId(), msg);
-		}
-	}
+    /**
+     * Send a message to all direct outgoing neighbors
+     * 
+     * @param msg
+     *            the message
+     */
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
 
-	/**
-	 * Vote to halt. Once all vertex vote to halt and no more messages, a
-	 * Pregelix job will terminate.
-	 */
-	public final void voteToHalt() {
-		halt = true;
-	}
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     */
+    public final void voteToHalt() {
+        halt = true;
+    }
 
-	/**
-	 * @return the vertex is halted (true) or not (false)
-	 */
-	public final boolean isHalted() {
-		return halt;
-	}
+    /**
+     * @return the vertex is halted (true) or not (false)
+     */
+    public final boolean isHalted() {
+        return halt;
+    }
 
-	@Override
-	final public void readFields(DataInput in) throws IOException {
-		reset();
-		if (vertexId == null)
-			vertexId = BspUtils.<I> createVertexIndex(getContext()
-					.getConfiguration());
-		vertexId.readFields(in);
-		delegate.setVertexId(vertexId);
-		boolean hasVertexValue = in.readBoolean();
-		if (hasVertexValue) {
-			vertexValue = allocateValue();
-			vertexValue.readFields(in);
-			delegate.setVertex(this);
-		}
-		destEdgeList.clear();
-		long edgeMapSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < edgeMapSize; ++i) {
-			Edge<I, E> edge = allocateEdge();
-			edge.setConf(getContext().getConfiguration());
-			edge.readFields(in);
-			addEdge(edge);
-		}
-		msgList.clear();
-		long msgListSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < msgListSize; ++i) {
-			M msg = allocateMessage();
-			msg.readFields(in);
-			msgList.add(msg);
-		}
-		halt = in.readBoolean();
-		updated = false;
-		hasMessage = false;
-	}
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+    }
 
-	@Override
-	public void write(DataOutput out) throws IOException {
-		vertexId.write(out);
-		out.writeBoolean(vertexValue != null);
-		if (vertexValue != null) {
-			vertexValue.write(out);
-		}
-		SerDeUtils.writeVLong(out, destEdgeList.size());
-		for (Edge<I, E> edge : destEdgeList) {
-			edge.write(out);
-		}
-		SerDeUtils.writeVLong(out, msgList.size());
-		for (M msg : msgList) {
-			msg.write(out);
-		}
-		out.writeBoolean(halt);
-	}
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
 
-	private boolean addEdge(Edge<I, E> edge) {
-		edge.setConf(getContext().getConfiguration());
-		destEdgeList.add(edge);
-		return true;
-	}
+    private boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        destEdgeList.add(edge);
+        return true;
+    }
 
-	/**
-	 * Get the list of incoming messages
-	 * 
-	 * @return the list of messages
-	 */
-	public List<M> getMsgList() {
-		return msgList;
-	}
+    /**
+     * Get the list of incoming messages
+     * 
+     * @return the list of messages
+     */
+    public List<M> getMsgList() {
+        return msgList;
+    }
 
-	/**
-	 * Get outgoing edge list
-	 * 
-	 * @return a list of outgoing edges
-	 */
-	public List<Edge<I, E>> getEdges() {
-		return this.destEdgeList;
-	}
+    /**
+     * Get outgoing edge list
+     * 
+     * @return a list of outgoing edges
+     */
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
 
-	public final Mapper<?, ?, ?, ?>.Context getContext() {
-		return context;
-	}
+    public final Mapper<?, ?, ?, ?>.Context getContext() {
+        return context;
+    }
 
-	public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
-		Vertex.context = context;
-	}
+    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+        Vertex.context = context;
+    }
 
-	@SuppressWarnings("unchecked")
-	public String toString() {
-		Collections.sort(destEdgeList);
-		StringBuffer edgeBuffer = new StringBuffer();
-		edgeBuffer.append("(");
-		for (Edge<I, E> edge : destEdgeList) {
-			edgeBuffer.append(edge.getDestVertexId()).append(",");
-		}
-		edgeBuffer.append(")");
-		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
-				+ ", edges=" + edgeBuffer + ")";
-	}
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
 
-	public void setOutputWriters(List<IFrameWriter> writers) {
-		delegate.setOutputWriters(writers);
-	}
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
 
-	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-		delegate.setOutputAppenders(appenders);
-	}
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
 
-	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-		delegate.setOutputTupleBuilders(tbs);
-	}
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
 
-	public void finishCompute() throws IOException {
-		delegate.finishCompute();
-	}
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
 
-	public boolean hasUpdate() {
-		return this.updated;
-	}
+    public boolean hasUpdate() {
+        return this.updated;
+    }
 
-	public boolean hasMessage() {
-		return this.hasMessage;
-	}
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
 
-	public int getNumOutEdges() {
-		return destEdgeList.size();
-	}
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
 
-	@SuppressWarnings("unchecked")
-	public void sortEdges() {
-		Collections.sort((List) destEdgeList);
-	}
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        Collections.sort((List) destEdgeList);
+    }
 
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
similarity index 93%
rename from pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
rename to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index ec84fc1..7267f30 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.api.delegate;
+package edu.uci.ics.pregelix.api.graph;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -25,12 +25,10 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
 
 @SuppressWarnings("rawtypes")
-public class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
     /** Vertex id */
     private I vertexId = null;
     /** Vertex value */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index ccf19ee..1468431 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -19,12 +19,12 @@
     }
 
     @Override
-    public void step(I vertexIndex, M msg) throws HyracksDataException {
+    public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
         msgList.addElement(msg);
     }
 
     @Override
-    public void step(MsgList partialAggregate) throws HyracksDataException {
+    public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
         msgList.addAllElements(partialAggregate);
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 775c3ed..7c2d8c8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -50,7 +50,7 @@
         private MsgList<VLongWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
+        public void stepPartial(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
             long value = msg.get();
             if (min > value)
                 min = value;
@@ -64,7 +64,7 @@
         }
 
         @Override
-        public void step(VLongWritable partialAggregate) throws HyracksDataException {
+        public void stepFinal(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
             if (min > partialAggregate.get())
                 min = partialAggregate.get();
         }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 7e21245..3593a04 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -71,7 +71,7 @@
         }
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+        public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             sum += msg.get();
         }
 
@@ -82,7 +82,7 @@
         }
 
         @Override
-        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+        public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
             sum += partialAggregate.get();
         }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
index 8c9c1ef..0396beb 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
@@ -60,13 +60,13 @@
         }
 
         @Override
-        public void step(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+        public void stepPartial(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
             int newState = agg.get() | msg.get();
             agg.set((byte) newState);
         }
 
         @Override
-        public void step(ByteWritable partialAggregate) throws HyracksDataException {
+        public void stepFinal(VLongWritable vertexIndex, ByteWritable partialAggregate) throws HyracksDataException {
             int newState = agg.get() | partialAggregate.get();
             agg.set((byte) newState);
         }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 89ea951..0a535a6 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -46,7 +46,7 @@
         private MsgList<DoubleWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+        public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             double value = msg.get();
             if (min > value)
                 min = value;
@@ -66,7 +66,7 @@
         }
 
         @Override
-        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+        public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
             double value = partialAggregate.get();
             if (min > value)
                 min = value;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 6962aa5..1813dcc 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -93,9 +93,9 @@
             }
             value.readFields(valueInput);
             if (!partialAggAsInput) {
-                combiner.step(key, value);
+                combiner.stepPartial(key, value);
             } else {
-                combiner.step(value);
+                combiner.stepFinal(key, value);
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);