svn merge -r3112:3163 https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization svn merge -r3173:3188 https://hyracks.googlecode.com/svn/branches/fullstack_release_candidate@3234 svn merge -r3209:3233 https://hyracks.googlecode.com/svn/trunk/fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3237 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index 7d08fb7..0d2bdba 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>pregelix</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
+  <version>0.2.4-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>pregelix</name>
 
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 8212e1c..b8bfce9 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 
 	<properties>
@@ -67,7 +67,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -80,7 +80,7 @@
 		<dependency>
                         <groupId>edu.uci.ics.hyracks</groupId>
                         <artifactId>hyracks-hdfs-core</artifactId>
-                        <version>0.2.3-SNAPSHOT</version>
+                        <version>0.2.4-SNAPSHOT</version>
                         <type>jar</type>
                         <scope>compile</scope>
                 </dependency>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 3a98fd9..724ec14 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,510 +49,531 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-        implements Writable {
-    private static long superstep = 0;
-    /** Class-wide number of vertices */
-    private static long numVertices = -1;
-    /** Class-wide number of edges */
-    private static long numEdges = -1;
-    /** Vertex id */
-    private I vertexId = null;
-    /** Vertex value */
-    private V vertexValue = null;
-    /** Map of destination vertices and their edge values */
-    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
-    /** List of incoming messages from the previous superstep */
-    private final List<M> msgList = new ArrayList<M>();
-    /** map context */
-    private static TaskAttemptContext context = null;
-    /** a delegate for hyracks stuff */
-    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
-    /** this vertex is updated or not */
-    private boolean updated = false;
-    /** has outgoing messages */
-    private boolean hasMessage = false;
-    /** created new vertex */
-    private boolean createdNewLiveVertex = false;
+		implements Writable {
+	private static long superstep = 0;
+	/** Class-wide number of vertices */
+	private static long numVertices = -1;
+	/** Class-wide number of edges */
+	private static long numEdges = -1;
+	/** Vertex id */
+	private I vertexId = null;
+	/** Vertex value */
+	private V vertexValue = null;
+	/** Map of destination vertices and their edge values */
+	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+	/** If true, do not do anymore computation on this vertex. */
+	boolean halt = false;
+	/** List of incoming messages from the previous superstep */
+	private final List<M> msgList = new ArrayList<M>();
+	/** map context */
+	private static TaskAttemptContext context = null;
+	/** a delegate for hyracks stuff */
+	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
+			this);
+	/** this vertex is updated or not */
+	private boolean updated = false;
+	/** has outgoing messages */
+	private boolean hasMessage = false;
+	/** created new vertex */
+	private boolean createdNewLiveVertex = false;
 
-    /**
-     * use object pool for re-using objects
-     */
-    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-    private List<M> msgPool = new ArrayList<M>();
-    private List<V> valuePool = new ArrayList<V>();
-    private int usedEdge = 0;
-    private int usedMessage = 0;
-    private int usedValue = 0;
+	/**
+	 * use object pool for re-using objects
+	 */
+	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+	private List<M> msgPool = new ArrayList<M>();
+	private List<V> valuePool = new ArrayList<V>();
+	private int usedEdge = 0;
+	private int usedMessage = 0;
+	private int usedValue = 0;
 
-    /**
-     * The key method that users need to implement
-     * 
-     * @param msgIterator
-     *            an iterator of incoming messages
-     */
-    public abstract void compute(Iterator<M> msgIterator);
+	/**
+	 * The key method that users need to implement
+	 * 
+	 * @param msgIterator
+	 *            an iterator of incoming messages
+	 */
+	public abstract void compute(Iterator<M> msgIterator);
 
-    /**
-     * Add an edge for the vertex.
-     * 
-     * @param targetVertexId
-     * @param edgeValue
-     * @return successful or not
-     */
-    public final boolean addEdge(I targetVertexId, E edgeValue) {
-        Edge<I, E> edge = this.allocateEdge();
-        edge.setDestVertexId(targetVertexId);
-        edge.setEdgeValue(edgeValue);
-        destEdgeList.add(edge);
-        return true;
-    }
+	/**
+	 * Add an edge for the vertex.
+	 * 
+	 * @param targetVertexId
+	 * @param edgeValue
+	 * @return successful or not
+	 */
+	public final boolean addEdge(I targetVertexId, E edgeValue) {
+		Edge<I, E> edge = this.allocateEdge();
+		edge.setDestVertexId(targetVertexId);
+		edge.setEdgeValue(edgeValue);
+		destEdgeList.add(edge);
+		updated = true;
+		return true;
+	}
 
-    /**
-     * Initialize a new vertex
-     * 
-     * @param vertexId
-     * @param vertexValue
-     * @param edges
-     * @param messages
-     */
-    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
-        if (vertexId != null) {
-            setVertexId(vertexId);
-        }
-        if (vertexValue != null) {
-            setVertexValue(vertexValue);
-        }
-        destEdgeList.clear();
-        if (edges != null && !edges.isEmpty()) {
-            for (Map.Entry<I, E> entry : edges.entrySet()) {
-                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
-            }
-        }
-        if (messages != null && !messages.isEmpty()) {
-            msgList.addAll(messages);
-        }
-    }
+	/**
+	 * Initialize a new vertex
+	 * 
+	 * @param vertexId
+	 * @param vertexValue
+	 * @param edges
+	 * @param messages
+	 */
+	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
+			List<M> messages) {
+		if (vertexId != null) {
+			setVertexId(vertexId);
+		}
+		if (vertexValue != null) {
+			setVertexValue(vertexValue);
+		}
+		destEdgeList.clear();
+		if (edges != null && !edges.isEmpty()) {
+			for (Map.Entry<I, E> entry : edges.entrySet()) {
+				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
+						.getValue()));
+			}
+		}
+		if (messages != null && !messages.isEmpty()) {
+			msgList.addAll(messages);
+		}
+	}
 
-    /**
-     * reset a vertex object: clear its internal states
-     */
-    public void reset() {
-        usedEdge = 0;
-        usedMessage = 0;
-        usedValue = 0;
-    }
+	/**
+	 * reset a vertex object: clear its internal states
+	 */
+	public void reset() {
+		usedEdge = 0;
+		usedMessage = 0;
+		usedValue = 0;
+		updated = false;
+	}
 
-    /**
-     * Set the vertex id
-     * 
-     * @param vertexId
-     */
-    public final void setVertexId(I vertexId) {
-        this.vertexId = vertexId;
-        delegate.setVertexId(vertexId);
-    }
+	/**
+	 * Set the vertex id
+	 * 
+	 * @param vertexId
+	 */
+	public final void setVertexId(I vertexId) {
+		this.vertexId = vertexId;
+		delegate.setVertexId(vertexId);
+	}
 
-    /**
-     * Get the vertex id
-     * 
-     * @return vertex id
-     */
-    public final I getVertexId() {
-        return vertexId;
-    }
+	/**
+	 * Get the vertex id
+	 * 
+	 * @return vertex id
+	 */
+	public final I getVertexId() {
+		return vertexId;
+	}
 
-    /**
-     * Get the vertex value
-     * 
-     * @return the vertex value
-     */
-    public final V getVertexValue() {
-        return vertexValue;
-    }
+	/**
+	 * Get the vertex value
+	 * 
+	 * @return the vertex value
+	 */
+	public final V getVertexValue() {
+		return vertexValue;
+	}
 
-    /**
-     * Set the vertex value
-     * 
-     * @param vertexValue
-     */
-    public final void setVertexValue(V vertexValue) {
-        this.vertexValue = vertexValue;
-        this.updated = true;
-    }
+	/**
+	 * Set the vertex value
+	 * 
+	 * @param vertexValue
+	 */
+	public final void setVertexValue(V vertexValue) {
+		this.vertexValue = vertexValue;
+		this.updated = true;
+	}
 
-    /***
-     * Send a message to a specific vertex
-     * 
-     * @param id
-     *            the receiver vertex id
-     * @param msg
-     *            the message
-     */
-    public final void sendMsg(I id, M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
-        }
-        delegate.sendMsg(id, msg);
-        this.hasMessage = true;
-    }
+	/***
+	 * Send a message to a specific vertex
+	 * 
+	 * @param id
+	 *            the receiver vertex id
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsg(I id, M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsg: Cannot send null message to " + id);
+		}
+		delegate.sendMsg(id, msg);
+		this.hasMessage = true;
+	}
 
-    /**
-     * Send a message to all direct outgoing neighbors
-     * 
-     * @param msg
-     *            the message
-     */
-    public final void sendMsgToAllEdges(M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        for (Edge<I, E> edge : destEdgeList) {
-            sendMsg(edge.getDestVertexId(), msg);
-        }
-    }
+	/**
+	 * Send a message to all direct outgoing neighbors
+	 * 
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsgToAllEdges(M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsgToAllEdges: Cannot send null message to all edges");
+		}
+		for (Edge<I, E> edge : destEdgeList) {
+			sendMsg(edge.getDestVertexId(), msg);
+		}
+	}
 
-    /**
-     * Vote to halt. Once all vertex vote to halt and no more messages, a
-     * Pregelix job will terminate.
-     */
-    public final void voteToHalt() {
-        halt = true;
-    }
+	/**
+	 * Vote to halt. Once all vertex vote to halt and no more messages, a
+	 * Pregelix job will terminate.
+	 */
+	public final void voteToHalt() {
+		halt = true;
+		updated = true;
+	}
 
-    /**
-     * @return the vertex is halted (true) or not (false)
-     */
-    public final boolean isHalted() {
-        return halt;
-    }
+	/**
+	 * @return the vertex is halted (true) or not (false)
+	 */
+	public final boolean isHalted() {
+		return halt;
+	}
 
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        reset();
-        if (vertexId == null)
-            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
-        vertexId.readFields(in);
-        delegate.setVertexId(vertexId);
-        boolean hasVertexValue = in.readBoolean();
+	@Override
+	final public void readFields(DataInput in) throws IOException {
+		reset();
+		if (vertexId == null)
+			vertexId = BspUtils.<I> createVertexIndex(getContext()
+					.getConfiguration());
+		vertexId.readFields(in);
+		delegate.setVertexId(vertexId);
+		boolean hasVertexValue = in.readBoolean();
 
-        if (hasVertexValue) {
-            vertexValue = allocateValue();
-            vertexValue.readFields(in);
-            delegate.setVertex(this);
-        }
-        destEdgeList.clear();
-        long edgeMapSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < edgeMapSize; ++i) {
-            Edge<I, E> edge = allocateEdge();
-            edge.setConf(getContext().getConfiguration());
-            edge.readFields(in);
-            addEdge(edge);
-        }
-        msgList.clear();
-        long msgListSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < msgListSize; ++i) {
-            M msg = allocateMessage();
-            msg.readFields(in);
-            msgList.add(msg);
-        }
-        halt = in.readBoolean();
-        updated = false;
-        hasMessage = false;
-        createdNewLiveVertex = false;
-    }
+		if (hasVertexValue) {
+			vertexValue = allocateValue();
+			vertexValue.readFields(in);
+			delegate.setVertex(this);
+		}
+		destEdgeList.clear();
+		long edgeMapSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < edgeMapSize; ++i) {
+			Edge<I, E> edge = allocateEdge();
+			edge.setConf(getContext().getConfiguration());
+			edge.readFields(in);
+			addEdge(edge);
+		}
+		msgList.clear();
+		long msgListSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < msgListSize; ++i) {
+			M msg = allocateMessage();
+			msg.readFields(in);
+			msgList.add(msg);
+		}
+		halt = in.readBoolean();
+		updated = false;
+		hasMessage = false;
+		createdNewLiveVertex = false;
+	}
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        vertexId.write(out);
-        out.writeBoolean(vertexValue != null);
-        if (vertexValue != null) {
-            vertexValue.write(out);
-        }
-        SerDeUtils.writeVLong(out, destEdgeList.size());
-        for (Edge<I, E> edge : destEdgeList) {
-            edge.write(out);
-        }
-        SerDeUtils.writeVLong(out, msgList.size());
-        for (M msg : msgList) {
-            msg.write(out);
-        }
-        out.writeBoolean(halt);
-    }
+	@Override
+	public void write(DataOutput out) throws IOException {
+		vertexId.write(out);
+		out.writeBoolean(vertexValue != null);
+		if (vertexValue != null) {
+			vertexValue.write(out);
+		}
+		SerDeUtils.writeVLong(out, destEdgeList.size());
+		for (Edge<I, E> edge : destEdgeList) {
+			edge.write(out);
+		}
+		SerDeUtils.writeVLong(out, msgList.size());
+		for (M msg : msgList) {
+			msg.write(out);
+		}
+		out.writeBoolean(halt);
+	}
 
-    /**
-     * Get the list of incoming messages
-     * 
-     * @return the list of messages
-     */
-    public List<M> getMsgList() {
-        return msgList;
-    }
+	/**
+	 * Get the list of incoming messages
+	 * 
+	 * @return the list of messages
+	 */
+	public List<M> getMsgList() {
+		return msgList;
+	}
 
-    /**
-     * Get outgoing edge list
-     * 
-     * @return a list of outgoing edges
-     */
-    public List<Edge<I, E>> getEdges() {
-        return this.destEdgeList;
-    }
+	/**
+	 * Get outgoing edge list
+	 * 
+	 * @return a list of outgoing edges
+	 */
+	public List<Edge<I, E>> getEdges() {
+		return this.destEdgeList;
+	}
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public String toString() {
-        Collections.sort(destEdgeList);
-        StringBuffer edgeBuffer = new StringBuffer();
-        edgeBuffer.append("(");
-        for (Edge<I, E> edge : destEdgeList) {
-            edgeBuffer.append(edge.getDestVertexId()).append(",");
-        }
-        edgeBuffer.append(")");
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
-    }
+	@Override
+	@SuppressWarnings("unchecked")
+	public String toString() {
+		Collections.sort(destEdgeList);
+		StringBuffer edgeBuffer = new StringBuffer();
+		edgeBuffer.append("(");
+		for (Edge<I, E> edge : destEdgeList) {
+			edgeBuffer.append(edge.getDestVertexId()).append(",");
+		}
+		edgeBuffer.append(")");
+		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
+				+ ", edges=" + edgeBuffer + ")";
+	}
 
-    /**
-     * Get the number of outgoing edges
-     * 
-     * @return the number of outging edges
-     */
-    public int getNumOutEdges() {
-        return destEdgeList.size();
-    }
+	/**
+	 * Get the number of outgoing edges
+	 * 
+	 * @return the number of outging edges
+	 */
+	public int getNumOutEdges() {
+		return destEdgeList.size();
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputWriters(List<IFrameWriter> writers) {
-        delegate.setOutputWriters(writers);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputWriters(List<IFrameWriter> writers) {
+		delegate.setOutputWriters(writers);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-        delegate.setOutputAppenders(appenders);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+		delegate.setOutputAppenders(appenders);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-        delegate.setOutputTupleBuilders(tbs);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+		delegate.setOutputTupleBuilders(tbs);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void finishCompute() throws IOException {
-        delegate.finishCompute();
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void finishCompute() throws IOException {
+		delegate.finishCompute();
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean hasUpdate() {
-        return this.updated;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasUpdate() {
+		return this.updated;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean hasMessage() {
-        return this.hasMessage;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasMessage() {
+		return this.hasMessage;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean createdNewLiveVertex() {
-        return this.createdNewLiveVertex;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean createdNewLiveVertex() {
+		return this.createdNewLiveVertex;
+	}
 
-    /**
-     * sort the edges
-     */
-    @SuppressWarnings("unchecked")
-    public void sortEdges() {
-        Collections.sort(destEdgeList);
-    }
+	/**
+	 * sort the edges
+	 */
+	@SuppressWarnings("unchecked")
+	public void sortEdges() {
+		updated = true;
+		Collections.sort(destEdgeList);
+	}
 
-    /**
-     * Allocate a new edge from the edge pool
-     */
-    private Edge<I, E> allocateEdge() {
-        Edge<I, E> edge;
-        if (usedEdge < edgePool.size()) {
-            edge = edgePool.get(usedEdge);
-            usedEdge++;
-        } else {
-            edge = new Edge<I, E>();
-            edgePool.add(edge);
-            usedEdge++;
-        }
-        return edge;
-    }
+	/**
+	 * Allocate a new edge from the edge pool
+	 */
+	private Edge<I, E> allocateEdge() {
+		Edge<I, E> edge;
+		if (usedEdge < edgePool.size()) {
+			edge = edgePool.get(usedEdge);
+			usedEdge++;
+		} else {
+			edge = new Edge<I, E>();
+			edgePool.add(edge);
+			usedEdge++;
+		}
+		return edge;
+	}
 
-    /**
-     * Allocate a new message from the message pool
-     */
-    private M allocateMessage() {
-        M message;
-        if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
-            usedMessage++;
-        } else {
-            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
-            msgPool.add(message);
-            usedMessage++;
-        }
-        return message;
-    }
+	/**
+	 * Allocate a new message from the message pool
+	 */
+	private M allocateMessage() {
+		M message;
+		if (usedMessage < msgPool.size()) {
+			message = msgPool.get(usedEdge);
+			usedMessage++;
+		} else {
+			message = BspUtils.<M> createMessageValue(getContext()
+					.getConfiguration());
+			msgPool.add(message);
+			usedMessage++;
+		}
+		return message;
+	}
 
-    /**
-     * Set the global superstep for all the vertices (internal use)
-     * 
-     * @param superstep
-     *            New superstep
-     */
-    public static final void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
+	/**
+	 * Set the global superstep for all the vertices (internal use)
+	 * 
+	 * @param superstep
+	 *            New superstep
+	 */
+	public static final void setSuperstep(long superstep) {
+		Vertex.superstep = superstep;
+	}
 
-    /**
-     * Add an outgoing edge into the vertex
-     * 
-     * @param edge
-     *            the edge to be added
-     * @return true if the edge list changed as a result of this call
-     */
-    public boolean addEdge(Edge<I, E> edge) {
-        edge.setConf(getContext().getConfiguration());
-        return destEdgeList.add(edge);
-    }
+	/**
+	 * Add an outgoing edge into the vertex
+	 * 
+	 * @param edge
+	 *            the edge to be added
+	 * @return true if the edge list changed as a result of this call
+	 */
+	public boolean addEdge(Edge<I, E> edge) {
+		edge.setConf(getContext().getConfiguration());
+		updated = true;
+		return destEdgeList.add(edge);
+	}
 
-    /**
-     * remove an outgoing edge in the graph
-     * 
-     * @param edge
-     *            the edge to be removed
-     * @return true if the edge is in the edge list of the vertex
-     */
-    public boolean removeEdge(Edge<I, E> edge) {
-        return destEdgeList.remove(edge);
-    }
+	/**
+	 * remove an outgoing edge in the graph
+	 * 
+	 * @param edge
+	 *            the edge to be removed
+	 * @return true if the edge is in the edge list of the vertex
+	 */
+	public boolean removeEdge(Edge<I, E> edge) {
+		updated = true;
+		return destEdgeList.remove(edge);
+	}
 
-    /**
-     * Add a new vertex into the graph
-     * 
-     * @param vertexId
-     *            the vertex id
-     * @param vertex
-     *            the vertex
-     */
-    public final void addVertex(I vertexId, Vertex vertex) {
-        createdNewLiveVertex |= !vertex.isHalted();
-        delegate.addVertex(vertexId, vertex);
-    }
+	/**
+	 * Add a new vertex into the graph
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 * @param vertex
+	 *            the vertex
+	 */
+	public final void addVertex(I vertexId, Vertex vertex) {
+		createdNewLiveVertex |= !vertex.isHalted();
+		delegate.addVertex(vertexId, vertex);
+	}
 
-    /**
-     * Delete a vertex from id
-     * 
-     * @param vertexId
-     *            the vertex id
-     */
-    public final void deleteVertex(I vertexId) {
-        delegate.deleteVertex(vertexId);
-    }
+	/**
+	 * Delete a vertex from id
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 */
+	public final void deleteVertex(I vertexId) {
+		delegate.deleteVertex(vertexId);
+	}
 
-    /**
-     * Allocate a vertex value from the object pool
-     * 
-     * @return a vertex value instance
-     */
-    private V allocateValue() {
-        V value;
-        if (usedValue < valuePool.size()) {
-            value = valuePool.get(usedValue);
-            usedValue++;
-        } else {
-            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
-            valuePool.add(value);
-            usedValue++;
-        }
-        return value;
-    }
+	public void activate() {
+		if (halt)
+			this.updated = true;
+		this.halt = false;
+	}
 
-    /**
-     * Get the current global superstep number
-     * 
-     * @return the current superstep number
-     */
-    public static final long getSuperstep() {
-        return superstep;
-    }
+	/**
+	 * Allocate a vertex value from the object pool
+	 * 
+	 * @return a vertex value instance
+	 */
+	private V allocateValue() {
+		V value;
+		if (usedValue < valuePool.size()) {
+			value = valuePool.get(usedValue);
+			usedValue++;
+		} else {
+			value = BspUtils.<V> createVertexValue(getContext()
+					.getConfiguration());
+			valuePool.add(value);
+			usedValue++;
+		}
+		return value;
+	}
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     * 
-     * @param numVertices
-     *            Aggregate vertices in the last superstep
-     */
-    public static final void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
+	/**
+	 * Get the current global superstep number
+	 * 
+	 * @return the current superstep number
+	 */
+	public static final long getSuperstep() {
+		return superstep;
+	}
 
-    /**
-     * Get the number of vertexes in the graph
-     * 
-     * @return the number of vertexes in the graph
-     */
-    public static final long getNumVertices() {
-        return numVertices;
-    }
+	/**
+	 * Set the total number of vertices from the last superstep.
+	 * 
+	 * @param numVertices
+	 *            Aggregate vertices in the last superstep
+	 */
+	public static final void setNumVertices(long numVertices) {
+		Vertex.numVertices = numVertices;
+	}
 
-    /**
-     * Set the total number of edges from the last superstep.
-     * 
-     * @param numEdges
-     *            Aggregate edges in the last superstep
-     */
-    public static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
+	/**
+	 * Get the number of vertexes in the graph
+	 * 
+	 * @return the number of vertexes in the graph
+	 */
+	public static final long getNumVertices() {
+		return numVertices;
+	}
 
-    /**
-     * Get the number of edges from this graph
-     * 
-     * @return the number of edges in the graph
-     */
-    public static final long getNumEdges() {
-        return numEdges;
-    }
+	/**
+	 * Set the total number of edges from the last superstep.
+	 * 
+	 * @param numEdges
+	 *            Aggregate edges in the last superstep
+	 */
+	public static void setNumEdges(long numEdges) {
+		Vertex.numEdges = numEdges;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public static final TaskAttemptContext getContext() {
-        return context;
-    }
+	/**
+	 * Get the number of edges from this graph
+	 * 
+	 * @return the number of edges in the graph
+	 */
+	public static final long getNumEdges() {
+		return numEdges;
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param context
-     */
-    public static final void setContext(TaskAttemptContext context) {
-        Vertex.context = context;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public static final TaskAttemptContext getContext() {
+		return context;
+	}
+
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param context
+	 */
+	public static final void setContext(TaskAttemptContext context) {
+		Vertex.context = context;
+	}
 
 }
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 5238068..576758b 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -1,5 +1,4 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-core</artifactId>
 	<packaging>jar</packaging>
@@ -8,7 +7,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 
 
@@ -195,84 +194,84 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-runtime</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -286,7 +285,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks.examples</groupId>
 			<artifactId>hyracks-integration-tests</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -306,7 +305,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index f07a246..3a4c41b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -75,11 +75,17 @@
         try {
             /** add hadoop configurations */
             URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-            job.getConfiguration().addResource(hadoopCore);
+            if (hadoopCore != null) {
+                job.getConfiguration().addResource(hadoopCore);
+            }
             URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-            job.getConfiguration().addResource(hadoopMapRed);
+            if (hadoopMapRed != null) {
+                job.getConfiguration().addResource(hadoopMapRed);
+            }
             URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-            job.getConfiguration().addResource(hadoopHdfs);
+            if (hadoopHdfs != null) {
+                job.getConfiguration().addResource(hadoopHdfs);
+            }
             ClusterConfig.loadClusterConfig(ipAddress, port);
 
             LOG.info("job started");
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 2a2e2bf..cd2a864 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.pregelix.core.util;
 
+import java.io.File;
 import java.util.EnumSet;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -44,7 +45,7 @@
     private static NodeControllerService nc2;
     private static IHyracksClientConnection hcc;
 
-    public static void init() throws Exception {
+    public static void init(String topologyFilePath) throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.clientNetIpAddress = CC_HOST;
         ccConfig.clusterNetIpAddress = CC_HOST;
@@ -53,6 +54,7 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 0;
         ccConfig.profileDumpPeriod = -1;
+        ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -64,6 +66,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -73,6 +76,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml b/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml
new file mode 100755
index 0000000..4710706
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
+        </network-switch>
+    </network-switch>
+</cluster-topology>
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/getip.sh b/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
index e0cdf73..a691c0f 100755
--- a/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
@@ -6,6 +6,10 @@
 then
         #Get IP Address
         IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+    	if [ "$IPADDR" = "" ]
+        then
+		IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi 
 	if [ "$IPADDR" = "" ]
         then
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
index 6997078..b1a2f74 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
@@ -91,7 +91,7 @@
   REPO="$BASEDIR"/lib
 fi
 
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$1
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
 
 # For Cygwin, switch paths to Windows format before running java
 if $cygwin; then
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index fe2551d..133b604 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -20,6 +20,12 @@
 export JAVA_HOME=$JAVA_HOME
 export JAVA_OPTS=$CCJAVA_OPTS
 
-#Launch hyracks cc script
+
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+if [ -f "conf/topology.xml"  ]; then
+#Launch hyracks cc script with topology
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+else
+#Launch hyracks cc script without toplogy
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+fi
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 97659d4..f7cadf6 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -102,7 +102,7 @@
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
         ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
         cleanupStores();
-        PregelixHyracksIntegrationUtil.init();
+        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
         PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
 
         FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
diff --git a/pregelix/pregelix-core/src/test/resources/topology.xml b/pregelix/pregelix-core/src/test/resources/topology.xml
new file mode 100755
index 0000000..4710706
--- /dev/null
+++ b/pregelix/pregelix-core/src/test/resources/topology.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
+        </network-switch>
+    </network-switch>
+</cluster-topology>
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index caa3222..eeaa6c9 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
     		<groupId>edu.uci.ics.hyracks</groupId>
     		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
+    		<version>0.2.4-SNAPSHOT</version>
   	</parent>
 
 
@@ -73,14 +73,14 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index c5a0913..889c876 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,5 +1,4 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow-std</artifactId>
 	<packaging>jar</packaging>
@@ -8,7 +7,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 
 
@@ -74,77 +73,77 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std-base</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-hdfs-core</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index d3f396e..ffc000b 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,5 +1,4 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
@@ -8,7 +7,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 
 
@@ -74,68 +73,68 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std-base</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-dist/pom.xml b/pregelix/pregelix-dist/pom.xml
index 847e843..e9aef12 100644
--- a/pregelix/pregelix-dist/pom.xml
+++ b/pregelix/pregelix-dist/pom.xml
@@ -4,7 +4,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 	<artifactId>pregelix-dist</artifactId>
 	<name>pregelix-dist</name>
@@ -69,13 +69,13 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-core</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-example</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index e643331..beefee2 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>edu.uci.ics.hyracks</groupId>
 		<artifactId>pregelix</artifactId>
-		<version>0.2.3-SNAPSHOT</version>
+		<version>0.2.4-SNAPSHOT</version>
 	</parent>
 
 	<build>
@@ -104,7 +104,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-core</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 37f03a5..321b5b2 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -76,7 +76,7 @@
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
         ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
         cleanupStores();
-        PregelixHyracksIntegrationUtil.init();
+        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
         PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
         LOGGER.info("Hyracks mini-cluster started");
         startHDFS();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 79a5c3c..fa98ebd 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -75,7 +75,7 @@
 		ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
 		ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
 		cleanupStores();
-		PregelixHyracksIntegrationUtil.init();
+		PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
 		PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
 		LOGGER.info("Hyracks mini-cluster started");
 		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
diff --git a/pregelix/pregelix-example/src/test/resources/topology.xml b/pregelix/pregelix-example/src/test/resources/topology.xml
new file mode 100755
index 0000000..2a6c380
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/topology.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.1.0.1"/>
+        </network-switch>
+    </network-switch>
+</cluster-topology>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index e75f98b..bce7b12 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
     		<groupId>edu.uci.ics.hyracks</groupId>
     		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
+    		<version>0.2.4-SNAPSHOT</version>
   	</parent>
 
 
@@ -73,82 +73,82 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>pregelix-dataflow</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-ipc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
+			<version>0.2.4-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..e3c5a48 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -179,6 +179,7 @@
                     return;
 
                 try {
+                	vertex.activate();
                     vertex.compute(msgIterator);
                     vertex.finishCompute();
                 } catch (IOException e) {