merged r3236:3273 fullstack_asterix_stabilization --> fullstack_lsm_staging

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3278 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 371169f..52e6005 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -17,9 +17,10 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetDirectoryService {
+public interface IDatasetDirectoryService extends IJobLifecycleListener {
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
             int nPartitions, NetworkAddress networkAddress);
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 05365d3..506a870 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -170,7 +170,7 @@
             }
         };
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService();
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
         jobCounter = 0;
     }
 
@@ -205,6 +205,7 @@
 
     private void startApplication() throws Exception {
         appCtx = new CCApplicationContext(serverCtx, ccContext);
+        appCtx.addJobLifecycleListener(datasetDirectoryService);
         String className = ccConfig.appCCMainClass;
         if (className != null) {
             Class<?> c = Class.forName(className);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 13ae426..cdcdf4c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -15,7 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.dataset;
 
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -26,6 +26,8 @@
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 /**
@@ -38,14 +40,39 @@
 public class DatasetDirectoryService implements IDatasetDirectoryService {
     private final Map<JobId, DatasetJobRecord> jobResultLocations;
 
-    public DatasetDirectoryService() {
-        jobResultLocations = new HashMap<JobId, DatasetJobRecord>();
+    public DatasetDirectoryService(final int jobHistorySize) {
+        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
+            private static final long serialVersionUID = 1L;
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
+                return size() > jobHistorySize;
+            }
+        };
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        if (djr == null) {
+            djr = new DatasetJobRecord();
+            jobResultLocations.put(jobId, djr);
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+        // Auto-generated method stub
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId) throws HyracksException {
+        // Auto-generated method stub
     }
 
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
@@ -84,14 +111,14 @@
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
         djr.fail();
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
         djr.fail();
         notifyAll();
     }
@@ -158,7 +185,11 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+
+        if (djr == null) {
+            throw new HyracksDataException("Requested JobId " + jobId + "doesn't exist");
+        }
 
         if (djr.getStatus() == Status.FAILED) {
             throw new HyracksDataException("Job failed.");
@@ -201,13 +232,4 @@
         }
         return null;
     }
-
-    private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
-        if (djr == null) {
-            djr = new DatasetJobRecord();
-            jobResultLocations.put(jobId, djr);
-        }
-        return djr;
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index d1577bc..ec29592 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -51,6 +51,9 @@
     @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
     public int maxMemory = -1;
 
+    @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
+    public int resultHistorySize = 100;
+
     @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
     public int resultManagerMemory = -1;
 
@@ -79,10 +82,12 @@
         cList.add(String.valueOf(nNetThreads));
         cList.add("-max-memory");
         cList.add(String.valueOf(maxMemory));
+        cList.add("-result-history-size");
+        cList.add(String.valueOf(resultHistorySize));
         cList.add("-result-manager-memory");
         cList.add(String.valueOf(resultManagerMemory));
 
-       if (appNCMainClass != null) {
+        if (appNCMainClass != null) {
             cList.add("-app-nc-main-class");
             cList.add(appNCMainClass);
         }
@@ -92,5 +97,5 @@
                 cList.add(appArg);
             }
         }
-   }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 10e0dba..e15c60e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -150,7 +150,8 @@
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
 
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+                ncConfig.resultHistorySize);
         datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
                 datasetPartitionManager, ncConfig.nNetThreads);
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 1cad54b..1e58b5c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -44,13 +44,26 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
-    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
+            final int resultHistorySize) {
         this.ncs = ncs;
         this.executor = executor;
-        partitionResultStateMap = new HashMap<JobId, ResultState[]>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
         datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+        partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+            private static final long serialVersionUID = 1L;
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
+                if (size() > resultHistorySize) {
+                    for (ResultState state : eldest.getValue()) {
+                        state.deinit();
+                    }
+                    return true;
+                }
+                return false;
+            }
+        };
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 3db3fd9..0f1d94c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -64,6 +64,10 @@
         notifyAll();
     }
 
+    public synchronized void deinit() {
+        fileRef.delete();
+    }
+
     public ResultSetPartitionId getResultSetPartitionId() {
         return resultSetPartitionId;
     }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index cd49184..e51d4bc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,525 +49,524 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-		implements Writable {
-	private static long superstep = 0;
-	/** Class-wide number of vertices */
-	private static long numVertices = -1;
-	/** Class-wide number of edges */
-	private static long numEdges = -1;
-	/** Vertex id */
-	private I vertexId = null;
-	/** Vertex value */
-	private V vertexValue = null;
-	/** Map of destination vertices and their edge values */
-	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-	/** If true, do not do anymore computation on this vertex. */
-	boolean halt = false;
-	/** List of incoming messages from the previous superstep */
-	private final List<M> msgList = new ArrayList<M>();
-	/** map context */
-	private static TaskAttemptContext context = null;
-	/** a delegate for hyracks stuff */
-	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
-			this);
-	/** this vertex is updated or not */
-	private boolean updated = false;
-	/** has outgoing messages */
-	private boolean hasMessage = false;
-	/** created new vertex */
-	private boolean createdNewLiveVertex = false;
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static TaskAttemptContext context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
+    /** created new vertex */
+    private boolean createdNewLiveVertex = false;
 
-	/**
-	 * use object pool for re-using objects
-	 */
-	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-	private List<M> msgPool = new ArrayList<M>();
-	private List<V> valuePool = new ArrayList<V>();
-	private int usedEdge = 0;
-	private int usedMessage = 0;
-	private int usedValue = 0;
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
 
-	/**
-	 * The key method that users need to implement
-	 * 
-	 * @param msgIterator
-	 *            an iterator of incoming messages
-	 */
-	public abstract void compute(Iterator<M> msgIterator);
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     *            an iterator of incoming messages
+     */
+    public abstract void compute(Iterator<M> msgIterator);
 
-	/**
-	 * Add an edge for the vertex.
-	 * 
-	 * @param targetVertexId
-	 * @param edgeValue
-	 * @return successful or not
-	 */
-	public final boolean addEdge(I targetVertexId, E edgeValue) {
-		Edge<I, E> edge = this.allocateEdge();
-		edge.setDestVertexId(targetVertexId);
-		edge.setEdgeValue(edgeValue);
-		destEdgeList.add(edge);
-		updated = true;
-		return true;
-	}
+    /**
+     * Add an edge for the vertex.
+     * 
+     * @param targetVertexId
+     * @param edgeValue
+     * @return successful or not
+     */
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        updated = true;
+        return true;
+    }
 
-	/**
-	 * Initialize a new vertex
-	 * 
-	 * @param vertexId
-	 * @param vertexValue
-	 * @param edges
-	 * @param messages
-	 */
-	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
-			List<M> messages) {
-		if (vertexId != null) {
-			setVertexId(vertexId);
-		}
-		if (vertexValue != null) {
-			setVertexValue(vertexValue);
-		}
-		destEdgeList.clear();
-		if (edges != null && !edges.isEmpty()) {
-			for (Map.Entry<I, E> entry : edges.entrySet()) {
-				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
-						.getValue()));
-			}
-		}
-		if (messages != null && !messages.isEmpty()) {
-			msgList.addAll(messages);
-		}
-	}
+    /**
+     * Initialize a new vertex
+     * 
+     * @param vertexId
+     * @param vertexValue
+     * @param edges
+     * @param messages
+     */
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
 
-	/**
-	 * reset a vertex object: clear its internal states
-	 */
-	public void reset() {
-		usedEdge = 0;
-		usedMessage = 0;
-		usedValue = 0;
-		updated = false;
-	}
+    /**
+     * reset a vertex object: clear its internal states
+     */
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+        updated = false;
+    }
 
-	/**
-	 * Set the vertex id
-	 * 
-	 * @param vertexId
-	 */
-	public final void setVertexId(I vertexId) {
-		this.vertexId = vertexId;
-		delegate.setVertexId(vertexId);
-	}
+    /**
+     * Set the vertex id
+     * 
+     * @param vertexId
+     */
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
 
-	/**
-	 * Get the vertex id
-	 * 
-	 * @return vertex id
-	 */
-	public final I getVertexId() {
-		return vertexId;
-	}
+    /**
+     * Get the vertex id
+     * 
+     * @return vertex id
+     */
+    public final I getVertexId() {
+        return vertexId;
+    }
 
-	/**
-	 * Get the vertex value
-	 * 
-	 * @return the vertex value
-	 */
-	public final V getVertexValue() {
-		return vertexValue;
-	}
+    /**
+     * Get the vertex value
+     * 
+     * @return the vertex value
+     */
+    public final V getVertexValue() {
+        return vertexValue;
+    }
 
-	/**
-	 * Set the vertex value
-	 * 
-	 * @param vertexValue
-	 */
-	public final void setVertexValue(V vertexValue) {
-		this.vertexValue = vertexValue;
-		this.updated = true;
-	}
+    /**
+     * Set the vertex value
+     * 
+     * @param vertexValue
+     */
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
 
-	/***
-	 * Send a message to a specific vertex
-	 * 
-	 * @param id
-	 *            the receiver vertex id
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsg(I id, M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsg: Cannot send null message to " + id);
-		}
-		delegate.sendMsg(id, msg);
-		this.hasMessage = true;
-	}
+    /***
+     * Send a message to a specific vertex
+     * 
+     * @param id
+     *            the receiver vertex id
+     * @param msg
+     *            the message
+     */
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
 
-	/**
-	 * Send a message to all direct outgoing neighbors
-	 * 
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsgToAllEdges(M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsgToAllEdges: Cannot send null message to all edges");
-		}
-		for (Edge<I, E> edge : destEdgeList) {
-			sendMsg(edge.getDestVertexId(), msg);
-		}
-	}
+    /**
+     * Send a message to all direct outgoing neighbors
+     * 
+     * @param msg
+     *            the message
+     */
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
 
-	/**
-	 * Vote to halt. Once all vertex vote to halt and no more messages, a
-	 * Pregelix job will terminate.
-	 */
-	public final void voteToHalt() {
-		halt = true;
-		updated = true;
-	}
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     */
+    public final void voteToHalt() {
+        halt = true;
+        updated = true;
+    }
 
-	/**
-	 * @return the vertex is halted (true) or not (false)
-	 */
-	public final boolean isHalted() {
-		return halt;
-	}
+    /**
+     * Activate a halted vertex such that it is alive again.
+     */
+    public final void activate() {
+        halt = false;
+        updated = true;
+    }
 
-	@Override
-	final public void readFields(DataInput in) throws IOException {
-		reset();
-		if (vertexId == null)
-			vertexId = BspUtils.<I> createVertexIndex(getContext()
-					.getConfiguration());
-		vertexId.readFields(in);
-		delegate.setVertexId(vertexId);
-		boolean hasVertexValue = in.readBoolean();
+    /**
+     * @return the vertex is halted (true) or not (false)
+     */
+    public final boolean isHalted() {
+        return halt;
+    }
 
-		if (hasVertexValue) {
-			vertexValue = allocateValue();
-			vertexValue.readFields(in);
-			delegate.setVertex(this);
-		}
-		destEdgeList.clear();
-		long edgeMapSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < edgeMapSize; ++i) {
-			Edge<I, E> edge = allocateEdge();
-			edge.setConf(getContext().getConfiguration());
-			edge.readFields(in);
-			addEdge(edge);
-		}
-		msgList.clear();
-		long msgListSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < msgListSize; ++i) {
-			M msg = allocateMessage();
-			msg.readFields(in);
-			msgList.add(msg);
-		}
-		halt = in.readBoolean();
-		updated = false;
-		hasMessage = false;
-		createdNewLiveVertex = false;
-	}
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
 
-	@Override
-	public void write(DataOutput out) throws IOException {
-		vertexId.write(out);
-		out.writeBoolean(vertexValue != null);
-		if (vertexValue != null) {
-			vertexValue.write(out);
-		}
-		SerDeUtils.writeVLong(out, destEdgeList.size());
-		for (Edge<I, E> edge : destEdgeList) {
-			edge.write(out);
-		}
-		SerDeUtils.writeVLong(out, msgList.size());
-		for (M msg : msgList) {
-			msg.write(out);
-		}
-		out.writeBoolean(halt);
-	}
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+        createdNewLiveVertex = false;
+    }
 
-	/**
-	 * Get the list of incoming messages
-	 * 
-	 * @return the list of messages
-	 */
-	public List<M> getMsgList() {
-		return msgList;
-	}
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
 
-	/**
-	 * Get outgoing edge list
-	 * 
-	 * @return a list of outgoing edges
-	 */
-	public List<Edge<I, E>> getEdges() {
-		return this.destEdgeList;
-	}
+    /**
+     * Get the list of incoming messages
+     * 
+     * @return the list of messages
+     */
+    public List<M> getMsgList() {
+        return msgList;
+    }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public String toString() {
-		Collections.sort(destEdgeList);
-		StringBuffer edgeBuffer = new StringBuffer();
-		edgeBuffer.append("(");
-		for (Edge<I, E> edge : destEdgeList) {
-			edgeBuffer.append(edge.getDestVertexId()).append(",");
-		}
-		edgeBuffer.append(")");
-		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
-				+ ", edges=" + edgeBuffer + ")";
-	}
+    /**
+     * Get outgoing edge list
+     * 
+     * @return a list of outgoing edges
+     */
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
 
-	/**
-	 * Get the number of outgoing edges
-	 * 
-	 * @return the number of outging edges
-	 */
-	public int getNumOutEdges() {
-		return destEdgeList.size();
-	}
+    @Override
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputWriters(List<IFrameWriter> writers) {
-		delegate.setOutputWriters(writers);
-	}
+    /**
+     * Get the number of outgoing edges
+     * 
+     * @return the number of outging edges
+     */
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-		delegate.setOutputAppenders(appenders);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-		delegate.setOutputTupleBuilders(tbs);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void finishCompute() throws IOException {
-		delegate.finishCompute();
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasUpdate() {
-		return this.updated;
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasMessage() {
-		return this.hasMessage;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasUpdate() {
+        return this.updated;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean createdNewLiveVertex() {
-		return this.createdNewLiveVertex;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
 
-	/**
-	 * sort the edges
-	 */
-	@SuppressWarnings("unchecked")
-	public void sortEdges() {
-		updated = true;
-		Collections.sort(destEdgeList);
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean createdNewLiveVertex() {
+        return this.createdNewLiveVertex;
+    }
 
-	/**
-	 * Allocate a new edge from the edge pool
-	 */
-	private Edge<I, E> allocateEdge() {
-		Edge<I, E> edge;
-		if (usedEdge < edgePool.size()) {
-			edge = edgePool.get(usedEdge);
-			usedEdge++;
-		} else {
-			edge = new Edge<I, E>();
-			edgePool.add(edge);
-			usedEdge++;
-		}
-		return edge;
-	}
+    /**
+     * sort the edges
+     */
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        updated = true;
+        Collections.sort(destEdgeList);
+    }
 
-	/**
-	 * Allocate a new message from the message pool
-	 */
-	private M allocateMessage() {
-		M message;
-		if (usedMessage < msgPool.size()) {
-			message = msgPool.get(usedEdge);
-			usedMessage++;
-		} else {
-			message = BspUtils.<M> createMessageValue(getContext()
-					.getConfiguration());
-			msgPool.add(message);
-			usedMessage++;
-		}
-		return message;
-	}
+    /**
+     * Allocate a new edge from the edge pool
+     */
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
 
-	/**
-	 * Set the global superstep for all the vertices (internal use)
-	 * 
-	 * @param superstep
-	 *            New superstep
-	 */
-	public static final void setSuperstep(long superstep) {
-		Vertex.superstep = superstep;
-	}
+    /**
+     * Allocate a new message from the message pool
+     */
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
 
-	/**
-	 * Add an outgoing edge into the vertex
-	 * 
-	 * @param edge
-	 *            the edge to be added
-	 * @return true if the edge list changed as a result of this call
-	 */
-	public boolean addEdge(Edge<I, E> edge) {
-		edge.setConf(getContext().getConfiguration());
-		updated = true;
-		return destEdgeList.add(edge);
-	}
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static final void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
 
-	/**
-	 * remove an outgoing edge in the graph
-	 * 
-	 * @param edge
-	 *            the edge to be removed
-	 * @return true if the edge is in the edge list of the vertex
-	 */
-	public boolean removeEdge(Edge<I, E> edge) {
-		updated = true;
-		return destEdgeList.remove(edge);
-	}
+    /**
+     * Add an outgoing edge into the vertex
+     * 
+     * @param edge
+     *            the edge to be added
+     * @return true if the edge list changed as a result of this call
+     */
+    public boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        updated = true;
+        return destEdgeList.add(edge);
+    }
 
-	/**
-	 * Add a new vertex into the graph
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 * @param vertex
-	 *            the vertex
-	 */
-	public final void addVertex(I vertexId, Vertex vertex) {
-		createdNewLiveVertex |= !vertex.isHalted();
-		delegate.addVertex(vertexId, vertex);
-	}
+    /**
+     * remove an outgoing edge in the graph
+     * 
+     * @param edge
+     *            the edge to be removed
+     * @return true if the edge is in the edge list of the vertex
+     */
+    public boolean removeEdge(Edge<I, E> edge) {
+        updated = true;
+        return destEdgeList.remove(edge);
+    }
 
-	/**
-	 * Delete a vertex from id
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 */
-	public final void deleteVertex(I vertexId) {
-		delegate.deleteVertex(vertexId);
-	}
+    /**
+     * Add a new vertex into the graph
+     * 
+     * @param vertexId
+     *            the vertex id
+     * @param vertex
+     *            the vertex
+     */
+    public final void addVertex(I vertexId, Vertex vertex) {
+        createdNewLiveVertex |= !vertex.isHalted();
+        delegate.addVertex(vertexId, vertex);
+    }
 
-	/**
-	 * Allocate a vertex value from the object pool
-	 * 
-	 * @return a vertex value instance
-	 */
-	private V allocateValue() {
-		V value;
-		if (usedValue < valuePool.size()) {
-			value = valuePool.get(usedValue);
-			usedValue++;
-		} else {
-			value = BspUtils.<V> createVertexValue(getContext()
-					.getConfiguration());
-			valuePool.add(value);
-			usedValue++;
-		}
-		return value;
-	}
+    /**
+     * Delete a vertex from id
+     * 
+     * @param vertexId
+     *            the vertex id
+     */
+    public final void deleteVertex(I vertexId) {
+        delegate.deleteVertex(vertexId);
+    }
 
-	/**
-	 * Get the current global superstep number
-	 * 
-	 * @return the current superstep number
-	 */
-	public static final long getSuperstep() {
-		return superstep;
-	}
+    /**
+     * Allocate a vertex value from the object pool
+     * 
+     * @return a vertex value instance
+     */
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedValue);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
 
-	/**
-	 * Set the total number of vertices from the last superstep.
-	 * 
-	 * @param numVertices
-	 *            Aggregate vertices in the last superstep
-	 */
-	public static final void setNumVertices(long numVertices) {
-		Vertex.numVertices = numVertices;
-	}
+    /**
+     * Get the current global superstep number
+     * 
+     * @return the current superstep number
+     */
+    public static final long getSuperstep() {
+        return superstep;
+    }
 
-	/**
-	 * Get the number of vertexes in the graph
-	 * 
-	 * @return the number of vertexes in the graph
-	 */
-	public static final long getNumVertices() {
-		return numVertices;
-	}
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static final void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
 
-	/**
-	 * Set the total number of edges from the last superstep.
-	 * 
-	 * @param numEdges
-	 *            Aggregate edges in the last superstep
-	 */
-	public static void setNumEdges(long numEdges) {
-		Vertex.numEdges = numEdges;
-	}
+    /**
+     * Get the number of vertexes in the graph
+     * 
+     * @return the number of vertexes in the graph
+     */
+    public static final long getNumVertices() {
+        return numVertices;
+    }
 
-	/**
-	 * Get the number of edges from this graph
-	 * 
-	 * @return the number of edges in the graph
-	 */
-	public static final long getNumEdges() {
-		return numEdges;
-	}
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public static final TaskAttemptContext getContext() {
-		return context;
-	}
+    /**
+     * Get the number of edges from this graph
+     * 
+     * @return the number of edges in the graph
+     */
+    public static final long getNumEdges() {
+        return numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param context
-	 */
-	public static final void setContext(TaskAttemptContext context) {
-		Vertex.context = context;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public static final TaskAttemptContext getContext() {
+        return context;
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param context
+     */
+    public static final void setContext(TaskAttemptContext context) {
+        Vertex.context = context;
+    }
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..0c09757 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -175,8 +175,12 @@
                 ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
                 msgContentList.reset(msgIterator);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
                     return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
                 try {
                     vertex.compute(msgIterator);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 0cf64a0..1bf6a2b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -176,8 +176,12 @@
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
                     return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
                 try {
                     vertex.compute(msgIterator);