Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 56200e4..4638118 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -111,6 +111,11 @@
             BitSet sourceBitmap);
 
     /**
+     * Indicate whether the connector is an all-producers-to-all-consumers connector
+     */
+    public boolean allProducersToAllConsumers();
+
+    /**
      * Gets the display name.
      */
     public String getDisplayName();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
index 6390abf..aab59c8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -17,6 +17,8 @@
 public class HyracksDataException extends HyracksException {
     private static final long serialVersionUID = 1L;
 
+    private String nodeId;
+
     public HyracksDataException() {
     }
 
@@ -24,11 +26,19 @@
         super(message);
     }
 
+    public HyracksDataException(Throwable cause) {
+        super(cause);
+    }
+
     public HyracksDataException(String message, Throwable cause) {
         super(message, cause);
     }
 
-    public HyracksDataException(Throwable cause) {
-        super(cause);
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 67ba2b6..3ee036d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -33,276 +33,281 @@
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
 
 public class NodeControllerState {
-    private static final int RRD_SIZE = 720;
+	private static final int RRD_SIZE = 720;
 
-    private final INodeController nodeController;
+	private final INodeController nodeController;
 
-    private final NCConfig ncConfig;
+	private final NCConfig ncConfig;
 
-    private final NetworkAddress dataPort;
+	private final NetworkAddress dataPort;
 
-    private final NetworkAddress datasetPort;
+	private final NetworkAddress datasetPort;
 
-    private final Set<JobId> activeJobIds;
+	private final Set<JobId> activeJobIds;
 
-    private final String osName;
+	private final String osName;
 
-    private final String arch;
+	private final String arch;
 
-    private final String osVersion;
+	private final String osVersion;
 
-    private final int nProcessors;
+	private final int nProcessors;
 
-    private final String vmName;
+	private final String vmName;
 
-    private final String vmVersion;
+	private final String vmVersion;
 
-    private final String vmVendor;
+	private final String vmVendor;
 
-    private final String classpath;
+	private final String classpath;
 
-    private final String libraryPath;
+	private final String libraryPath;
 
-    private final String bootClasspath;
+	private final String bootClasspath;
 
-    private final List<String> inputArguments;
+	private final List<String> inputArguments;
 
-    private final Map<String, String> systemProperties;
+	private final Map<String, String> systemProperties;
 
-    private final HeartbeatSchema hbSchema;
+	private final HeartbeatSchema hbSchema;
 
-    private final long[] hbTime;
+	private final long[] hbTime;
 
-    private final long[] heapInitSize;
+	private final long[] heapInitSize;
 
-    private final long[] heapUsedSize;
+	private final long[] heapUsedSize;
 
-    private final long[] heapCommittedSize;
+	private final long[] heapCommittedSize;
 
-    private final long[] heapMaxSize;
+	private final long[] heapMaxSize;
 
-    private final long[] nonheapInitSize;
+	private final long[] nonheapInitSize;
 
-    private final long[] nonheapUsedSize;
+	private final long[] nonheapUsedSize;
 
-    private final long[] nonheapCommittedSize;
+	private final long[] nonheapCommittedSize;
 
-    private final long[] nonheapMaxSize;
+	private final long[] nonheapMaxSize;
 
-    private final int[] threadCount;
+	private final int[] threadCount;
 
-    private final int[] peakThreadCount;
+	private final int[] peakThreadCount;
 
-    private final double[] systemLoadAverage;
+	private final double[] systemLoadAverage;
 
-    private final String[] gcNames;
+	private final String[] gcNames;
 
-    private final long[][] gcCollectionCounts;
+	private final long[][] gcCollectionCounts;
 
-    private final long[][] gcCollectionTimes;
+	private final long[][] gcCollectionTimes;
 
-    private final long[] netPayloadBytesRead;
+	private final long[] netPayloadBytesRead;
 
-    private final long[] netPayloadBytesWritten;
+	private final long[] netPayloadBytesWritten;
 
-    private final long[] netSignalingBytesRead;
+	private final long[] netSignalingBytesRead;
 
-    private final long[] netSignalingBytesWritten;
+	private final long[] netSignalingBytesWritten;
 
-    private final long[] datasetNetPayloadBytesRead;
+	private final long[] datasetNetPayloadBytesRead;
 
-    private final long[] datasetNetPayloadBytesWritten;
+	private final long[] datasetNetPayloadBytesWritten;
 
-    private final long[] datasetNetSignalingBytesRead;
+	private final long[] datasetNetSignalingBytesRead;
 
-    private final long[] datasetNetSignalingBytesWritten;
+	private final long[] datasetNetSignalingBytesWritten;
 
-    private final long[] ipcMessagesSent;
+	private final long[] ipcMessagesSent;
 
-    private final long[] ipcMessageBytesSent;
+	private final long[] ipcMessageBytesSent;
 
-    private final long[] ipcMessagesReceived;
+	private final long[] ipcMessagesReceived;
 
-    private final long[] ipcMessageBytesReceived;
+	private final long[] ipcMessageBytesReceived;
 
-    private int rrdPtr;
+	private int rrdPtr;
 
-    private int lastHeartbeatDuration;
+	private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
-        this.nodeController = nodeController;
-        ncConfig = reg.getNCConfig();
-        dataPort = reg.getDataPort();
-        datasetPort = reg.getDatasetPort();
-        activeJobIds = new HashSet<JobId>();
+	public NodeControllerState(INodeController nodeController,
+			NodeRegistration reg) {
+		this.nodeController = nodeController;
+		ncConfig = reg.getNCConfig();
+		dataPort = reg.getDataPort();
+		datasetPort = reg.getDatasetPort();
+		activeJobIds = new HashSet<JobId>();
 
-        osName = reg.getOSName();
-        arch = reg.getArch();
-        osVersion = reg.getOSVersion();
-        nProcessors = reg.getNProcessors();
-        vmName = reg.getVmName();
-        vmVersion = reg.getVmVersion();
-        vmVendor = reg.getVmVendor();
-        classpath = reg.getClasspath();
-        libraryPath = reg.getLibraryPath();
-        bootClasspath = reg.getBootClasspath();
-        inputArguments = reg.getInputArguments();
-        systemProperties = reg.getSystemProperties();
+		osName = reg.getOSName();
+		arch = reg.getArch();
+		osVersion = reg.getOSVersion();
+		nProcessors = reg.getNProcessors();
+		vmName = reg.getVmName();
+		vmVersion = reg.getVmVersion();
+		vmVendor = reg.getVmVendor();
+		classpath = reg.getClasspath();
+		libraryPath = reg.getLibraryPath();
+		bootClasspath = reg.getBootClasspath();
+		inputArguments = reg.getInputArguments();
+		systemProperties = reg.getSystemProperties();
 
-        hbSchema = reg.getHeartbeatSchema();
+		hbSchema = reg.getHeartbeatSchema();
 
-        hbTime = new long[RRD_SIZE];
-        heapInitSize = new long[RRD_SIZE];
-        heapUsedSize = new long[RRD_SIZE];
-        heapCommittedSize = new long[RRD_SIZE];
-        heapMaxSize = new long[RRD_SIZE];
-        nonheapInitSize = new long[RRD_SIZE];
-        nonheapUsedSize = new long[RRD_SIZE];
-        nonheapCommittedSize = new long[RRD_SIZE];
-        nonheapMaxSize = new long[RRD_SIZE];
-        threadCount = new int[RRD_SIZE];
-        peakThreadCount = new int[RRD_SIZE];
-        systemLoadAverage = new double[RRD_SIZE];
-        GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
-        int gcN = gcInfos.length;
-        gcNames = new String[gcN];
-        for (int i = 0; i < gcN; ++i) {
-            gcNames[i] = gcInfos[i].getName();
-        }
-        gcCollectionCounts = new long[gcN][RRD_SIZE];
-        gcCollectionTimes = new long[gcN][RRD_SIZE];
-        netPayloadBytesRead = new long[RRD_SIZE];
-        netPayloadBytesWritten = new long[RRD_SIZE];
-        netSignalingBytesRead = new long[RRD_SIZE];
-        netSignalingBytesWritten = new long[RRD_SIZE];
-        datasetNetPayloadBytesRead = new long[RRD_SIZE];
-        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
-        datasetNetSignalingBytesRead = new long[RRD_SIZE];
-        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
-        ipcMessagesSent = new long[RRD_SIZE];
-        ipcMessageBytesSent = new long[RRD_SIZE];
-        ipcMessagesReceived = new long[RRD_SIZE];
-        ipcMessageBytesReceived = new long[RRD_SIZE];
+		hbTime = new long[RRD_SIZE];
+		heapInitSize = new long[RRD_SIZE];
+		heapUsedSize = new long[RRD_SIZE];
+		heapCommittedSize = new long[RRD_SIZE];
+		heapMaxSize = new long[RRD_SIZE];
+		nonheapInitSize = new long[RRD_SIZE];
+		nonheapUsedSize = new long[RRD_SIZE];
+		nonheapCommittedSize = new long[RRD_SIZE];
+		nonheapMaxSize = new long[RRD_SIZE];
+		threadCount = new int[RRD_SIZE];
+		peakThreadCount = new int[RRD_SIZE];
+		systemLoadAverage = new double[RRD_SIZE];
+		GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+		int gcN = gcInfos.length;
+		gcNames = new String[gcN];
+		for (int i = 0; i < gcN; ++i) {
+			gcNames[i] = gcInfos[i].getName();
+		}
+		gcCollectionCounts = new long[gcN][RRD_SIZE];
+		gcCollectionTimes = new long[gcN][RRD_SIZE];
+		netPayloadBytesRead = new long[RRD_SIZE];
+		netPayloadBytesWritten = new long[RRD_SIZE];
+		netSignalingBytesRead = new long[RRD_SIZE];
+		netSignalingBytesWritten = new long[RRD_SIZE];
+		datasetNetPayloadBytesRead = new long[RRD_SIZE];
+		datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+		datasetNetSignalingBytesRead = new long[RRD_SIZE];
+		datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+		ipcMessagesSent = new long[RRD_SIZE];
+		ipcMessageBytesSent = new long[RRD_SIZE];
+		ipcMessagesReceived = new long[RRD_SIZE];
+		ipcMessageBytesReceived = new long[RRD_SIZE];
 
-        rrdPtr = 0;
-    }
+		rrdPtr = 0;
+	}
 
-    public void notifyHeartbeat(HeartbeatData hbData) {
-        lastHeartbeatDuration = 0;
-
-        hbTime[rrdPtr] = System.currentTimeMillis();
-        heapInitSize[rrdPtr] = hbData.heapInitSize;
-        heapUsedSize[rrdPtr] = hbData.heapUsedSize;
-        heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
-        heapMaxSize[rrdPtr] = hbData.heapMaxSize;
-        nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
-        nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
-        nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
-        nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
-        threadCount[rrdPtr] = hbData.threadCount;
-        peakThreadCount[rrdPtr] = hbData.peakThreadCount;
-        systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
-        int gcN = hbSchema.getGarbageCollectorInfos().length;
-        for (int i = 0; i < gcN; ++i) {
-            gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
-            gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
-        }
-        netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
-        netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
-        netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
-        netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
-        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
-        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
-        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
-        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
-        ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
-        ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
-        ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
-        ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
-        rrdPtr = (rrdPtr + 1) % RRD_SIZE;
-    }
+	public void notifyHeartbeat(HeartbeatData hbData) {
+		lastHeartbeatDuration = 0;
+		hbTime[rrdPtr] = System.currentTimeMillis();
+		if (hbData != null) {
+			heapInitSize[rrdPtr] = hbData.heapInitSize;
+			heapUsedSize[rrdPtr] = hbData.heapUsedSize;
+			heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
+			heapMaxSize[rrdPtr] = hbData.heapMaxSize;
+			nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
+			nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
+			nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
+			nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
+			threadCount[rrdPtr] = hbData.threadCount;
+			peakThreadCount[rrdPtr] = hbData.peakThreadCount;
+			systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
+			int gcN = hbSchema.getGarbageCollectorInfos().length;
+			for (int i = 0; i < gcN; ++i) {
+				gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
+				gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
+			}
+			netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+			netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+			netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+			netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+			datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+			datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+			datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+			datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+			ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+			ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+			ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+			ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+		}
+		rrdPtr = (rrdPtr + 1) % RRD_SIZE;
+	}
 
-    public int incrementLastHeartbeatDuration() {
-        return lastHeartbeatDuration++;
-    }
+	public int incrementLastHeartbeatDuration() {
+		return lastHeartbeatDuration++;
+	}
 
-    public int getLastHeartbeatDuration() {
-        return lastHeartbeatDuration;
-    }
+	public int getLastHeartbeatDuration() {
+		return lastHeartbeatDuration;
+	}
 
-    public INodeController getNodeController() {
-        return nodeController;
-    }
+	public INodeController getNodeController() {
+		return nodeController;
+	}
 
-    public NCConfig getNCConfig() {
-        return ncConfig;
-    }
+	public NCConfig getNCConfig() {
+		return ncConfig;
+	}
 
-    public Set<JobId> getActiveJobIds() {
-        return activeJobIds;
-    }
+	public Set<JobId> getActiveJobIds() {
+		return activeJobIds;
+	}
 
-    public NetworkAddress getDataPort() {
-        return dataPort;
-    }
+	public NetworkAddress getDataPort() {
+		return dataPort;
+	}
 
-    public NetworkAddress getDatasetPort() {
-        return datasetPort;
-    }
+	public NetworkAddress getDatasetPort() {
+		return datasetPort;
+	}
 
-    public JSONObject toSummaryJSON() throws JSONException {
-        JSONObject o = new JSONObject();
-        o.put("node-id", ncConfig.nodeId);
-        o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
-        o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+	public JSONObject toSummaryJSON() throws JSONException {
+		JSONObject o = new JSONObject();
+		o.put("node-id", ncConfig.nodeId);
+		o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+		o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1)
+				% RRD_SIZE]);
 
-        return o;
-    }
+		return o;
+	}
 
-    public JSONObject toDetailedJSON() throws JSONException {
-        JSONObject o = new JSONObject();
+	public JSONObject toDetailedJSON() throws JSONException {
+		JSONObject o = new JSONObject();
 
-        o.put("node-id", ncConfig.nodeId);
-        o.put("os-name", osName);
-        o.put("arch", arch);
-        o.put("os-version", osVersion);
-        o.put("num-processors", nProcessors);
-        o.put("vm-name", vmName);
-        o.put("vm-version", vmVersion);
-        o.put("vm-vendor", vmVendor);
-        o.put("classpath", classpath);
-        o.put("library-path", libraryPath);
-        o.put("boot-classpath", bootClasspath);
-        o.put("input-arguments", new JSONArray(inputArguments));
-        o.put("rrd-ptr", rrdPtr);
-        o.put("heartbeat-times", hbTime);
-        o.put("heap-init-sizes", heapInitSize);
-        o.put("heap-used-sizes", heapUsedSize);
-        o.put("heap-committed-sizes", heapCommittedSize);
-        o.put("heap-max-sizes", heapMaxSize);
-        o.put("nonheap-init-sizes", nonheapInitSize);
-        o.put("nonheap-used-sizes", nonheapUsedSize);
-        o.put("nonheap-committed-sizes", nonheapCommittedSize);
-        o.put("nonheap-max-sizes", nonheapMaxSize);
-        o.put("thread-counts", threadCount);
-        o.put("peak-thread-counts", peakThreadCount);
-        o.put("system-load-averages", systemLoadAverage);
-        o.put("gc-names", gcNames);
-        o.put("gc-collection-counts", gcCollectionCounts);
-        o.put("gc-collection-times", gcCollectionTimes);
-        o.put("net-payload-bytes-read", netPayloadBytesRead);
-        o.put("net-payload-bytes-written", netPayloadBytesWritten);
-        o.put("net-signaling-bytes-read", netSignalingBytesRead);
-        o.put("net-signaling-bytes-written", netSignalingBytesWritten);
-        o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
-        o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
-        o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
-        o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
-        o.put("ipc-messages-sent", ipcMessagesSent);
-        o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
-        o.put("ipc-messages-received", ipcMessagesReceived);
-        o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+		o.put("node-id", ncConfig.nodeId);
+		o.put("os-name", osName);
+		o.put("arch", arch);
+		o.put("os-version", osVersion);
+		o.put("num-processors", nProcessors);
+		o.put("vm-name", vmName);
+		o.put("vm-version", vmVersion);
+		o.put("vm-vendor", vmVendor);
+		o.put("classpath", classpath);
+		o.put("library-path", libraryPath);
+		o.put("boot-classpath", bootClasspath);
+		o.put("input-arguments", new JSONArray(inputArguments));
+		o.put("rrd-ptr", rrdPtr);
+		o.put("heartbeat-times", hbTime);
+		o.put("heap-init-sizes", heapInitSize);
+		o.put("heap-used-sizes", heapUsedSize);
+		o.put("heap-committed-sizes", heapCommittedSize);
+		o.put("heap-max-sizes", heapMaxSize);
+		o.put("nonheap-init-sizes", nonheapInitSize);
+		o.put("nonheap-used-sizes", nonheapUsedSize);
+		o.put("nonheap-committed-sizes", nonheapCommittedSize);
+		o.put("nonheap-max-sizes", nonheapMaxSize);
+		o.put("thread-counts", threadCount);
+		o.put("peak-thread-counts", peakThreadCount);
+		o.put("system-load-averages", systemLoadAverage);
+		o.put("gc-names", gcNames);
+		o.put("gc-collection-counts", gcCollectionCounts);
+		o.put("gc-collection-times", gcCollectionTimes);
+		o.put("net-payload-bytes-read", netPayloadBytesRead);
+		o.put("net-payload-bytes-written", netPayloadBytesWritten);
+		o.put("net-signaling-bytes-read", netSignalingBytesRead);
+		o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+		o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+		o.put("dataset-net-payload-bytes-written",
+				datasetNetPayloadBytesWritten);
+		o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+		o.put("dataset-net-signaling-bytes-written",
+				datasetNetSignalingBytesWritten);
+		o.put("ipc-messages-sent", ipcMessagesSent);
+		o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+		o.put("ipc-messages-received", ipcMessagesReceived);
+		o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
 
-        return o;
-    }
+		return o;
+	}
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 2166620..bae0eb5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
 
 public class JobRun implements IJobStatusConditionVariable {
     private final DeploymentId deploymentId;
@@ -347,7 +348,7 @@
                                 taskAttempt.put("end-time", ta.getEndTime());
                                 List<Exception> exceptions = ta.getExceptions();
                                 if (exceptions != null && !exceptions.isEmpty()) {
-                                    List<Exception> filteredExceptions = ExceptionFilterUtils
+                                    List<Exception> filteredExceptions = ExceptionUtils
                                             .getActualExceptions(exceptions);
                                     for (Exception exception : filteredExceptions) {
                                         StringWriter exceptionWriter = new StringWriter();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 4d2ad6b..3863eda 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -190,18 +190,29 @@
                     ActivityId ac2 = ac.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
                     int nConsumers = ac2TaskStates.length;
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                .getTaskId());
-                        if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                    if (c.allProducersToAllConsumers()) {
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                        for (int j = 0; j < nConsumers; j++) {
                             TaskId targetTID = ac2TaskStates[j].getTaskId();
                             cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
                         }
+                        for (int i = 0; i < nProducers; ++i) {
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                        }
+                    } else {
+                        for (int i = 0; i < nProducers; ++i) {
+                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                            List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+                                    .getTaskId());
+                            if (cInfoList == null) {
+                                cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                                taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                            }
+                            for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                                TaskId targetTID = ac2TaskStates[j].getTaskId();
+                                cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+                            }
+                        }
                     }
                 }
             }
@@ -341,9 +352,15 @@
                     int nConsumers = ac2TaskStates.length;
 
                     int[] fanouts = new int[nProducers];
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        fanouts[i] = targetBitmap.cardinality();
+                    if (c.allProducersToAllConsumers()) {
+                        for (int i = 0; i < nProducers; ++i) {
+                            fanouts[i] = nConsumers;
+                        }
+                    } else {
+                        for (int i = 0; i < nProducers; ++i) {
+                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                            fanouts[i] = targetBitmap.cardinality();
+                        }
                     }
                     IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
                     cPolicyMap.put(cdId, cp);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 0b8346b..fd6360a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -27,6 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
@@ -45,6 +48,7 @@
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -458,13 +462,14 @@
     private void abortJob(List<Exception> exceptions) {
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
-            abortTaskCluster(findLastTaskClusterAttempt(tc));
+            abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
         assert inProgressTaskClusters.isEmpty();
         ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
     }
 
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt,
+            TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
         LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
@@ -477,11 +482,13 @@
                 ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
                 ta.setEndTime(System.currentTimeMillis());
                 List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
-                if (abortTaskAttempts == null) {
+                if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
                     abortTaskAttempts = new ArrayList<TaskAttemptId>();
                     abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
                 }
-                abortTaskAttempts.add(taId);
+                if (status == TaskAttempt.TaskStatus.RUNNING) {
+                    abortTaskAttempts.add(taId);
+                }
             }
         }
         final JobId jobId = jobRun.getJobId();
@@ -505,6 +512,9 @@
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
         pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+
+        tcAttempt.setStatus(failedOrAbortedStatus);
+        tcAttempt.setEndTime(System.currentTimeMillis());
     }
 
     private void abortDoomedTaskClusters() throws HyracksException {
@@ -519,9 +529,7 @@
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
             if (tca != null) {
-                abortTaskCluster(tca);
-                tca.setEndTime(System.currentTimeMillis());
-                tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
             }
         }
     }
@@ -608,9 +616,7 @@
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
                 LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
-                abortTaskCluster(lastAttempt);
-                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                lastAttempt.setEndTime(System.currentTimeMillis());
+                abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
                 if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
                     abortJob(exceptions);
@@ -635,32 +641,41 @@
     public void notifyNodeFailures(Set<String> deadNodes) {
         try {
             jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+            jobRun.getParticipatingNodeIds().removeAll(deadNodes);
+            jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
+            if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
+                finishJob(jobRun);
+                return;
+            }
             for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
-                TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
-                if (taskClusters != null) {
-                    for (TaskCluster tc : taskClusters) {
-                        TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
-                        if (lastTaskClusterAttempt != null
-                                && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
-                                        .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                            boolean abort = false;
-                            for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
-                                assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
-                                if (deadNodes.contains(ta.getNodeId())) {
-                                    ta.setStatus(
-                                            TaskAttempt.TaskStatus.FAILED,
-                                            Collections.singletonList(new Exception("Node " + ta.getNodeId()
-                                                    + " failed")));
-                                    ta.setEndTime(System.currentTimeMillis());
-                                    abort = true;
+                if (isPlanned(ac)) {
+                    TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+                    if (taskClusters != null) {
+                        for (TaskCluster tc : taskClusters) {
+                            TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+                            if (lastTaskClusterAttempt != null
+                                    && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+                                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                                boolean abort = false;
+                                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+                                    assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
+                                    if (deadNodes.contains(ta.getNodeId())) {
+                                        ta.setStatus(
+                                                TaskAttempt.TaskStatus.FAILED,
+                                                Collections.singletonList(new Exception("Node " + ta.getNodeId()
+                                                        + " failed")));
+                                        ta.setEndTime(System.currentTimeMillis());
+                                        abort = true;
+                                    }
+                                }
+                                if (abort) {
+                                    abortTaskCluster(lastTaskClusterAttempt,
+                                            TaskClusterAttempt.TaskClusterStatus.ABORTED);
                                 }
                             }
-                            if (abort) {
-                                abortTaskCluster(lastTaskClusterAttempt);
-                            }
                         }
+                        abortDoomedTaskClusters();
                     }
-                    abortDoomedTaskClusters();
                 }
             }
             startRunnableActivityClusters();
@@ -668,4 +683,37 @@
             abortJob(Collections.singletonList(e));
         }
     }
+
+    private void finishJob(final JobRun run) {
+        JobId jobId = run.getJobId();
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
+            try {
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                e.printStackTrace();
+            }
+        }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        ccs.getActiveRunMap().remove(jobId);
+        ccs.getRunMapArchive().put(jobId, run);
+        ccs.getRunHistory().put(jobId, run.getExceptions());
+        try {
+            ccs.getJobLogFile().log(createJobLogObject(run));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private JSONObject createJobLogObject(final JobRun run) {
+        JSONObject jobLogObject = new JSONObject();
+        try {
+            ActivityClusterGraph acg = run.getActivityClusterGraph();
+            jobLogObject.put("activity-cluster-graph", acg.toJSON());
+            jobLogObject.put("job-run", run.toJSON());
+        } catch (JSONException e) {
+            throw new RuntimeException(e);
+        }
+        return jobLogObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index bc58d1e..5109078 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
@@ -57,6 +58,7 @@
             LOGGER.log(Level.WARNING, "Error in stats reporting", e);
             throw new RuntimeException(e);
         }
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 6e8ddf0..22c3348 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.logging.Logger;
@@ -48,11 +49,16 @@
 
     @Override
     public void run() {
+        LOGGER.warning("Cleanup for JobRun with id: " + jobId);
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         if (run == null) {
             LOGGER.warning("Unable to find JobRun with id: " + jobId);
             return;
         }
+        if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+            finishJob(run);
+            return;
+        }
         if (run.getPendingStatus() != null) {
             LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
             return;
@@ -63,33 +69,47 @@
             run.setPendingStatus(status, exceptions);
         }
         if (targetNodes != null && !targetNodes.isEmpty()) {
+            Set<String> toDelete = new HashSet<String>();
             for (String n : targetNodes) {
                 NodeControllerState ncs = ccs.getNodeMap().get(n);
                 try {
-                    ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    if (ncs == null) {
+                        toDelete.add(n);
+                    } else {
+                        ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    }
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
+            targetNodes.removeAll(toDelete);
+            run.getCleanupPendingNodeIds().removeAll(toDelete);
+            if (run.getCleanupPendingNodeIds().isEmpty()) {
+                finishJob(run);
+            }
         } else {
-            CCApplicationContext appCtx = ccs.getApplicationContext();
-            if (appCtx != null) {
-                try {
-                    appCtx.notifyJobFinish(jobId);
-                } catch (HyracksException e) {
-                    e.printStackTrace();
-                }
-            }
-            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-            ccs.getActiveRunMap().remove(jobId);
-            ccs.getRunMapArchive().put(jobId, run);
-            ccs.getRunHistory().put(jobId, run.getExceptions());
+            finishJob(run);
+        }
+    }
+
+    private void finishJob(final JobRun run) {
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
             try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                e.printStackTrace();
             }
         }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        ccs.getActiveRunMap().remove(jobId);
+        ccs.getRunMapArchive().put(jobId, run);
+        ccs.getRunHistory().put(jobId, run.getExceptions());
+        try {
+            ccs.getJobLogFile().log(createJobLogObject(run));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     private JSONObject createJobLogObject(final JobRun run) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 5eb851a..66186ca 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class JobletCleanupNotificationWork extends AbstractWork {
@@ -45,6 +46,7 @@
 
     @Override
     public void run() {
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
         if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index 970a45d..eef6b96 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -14,11 +14,10 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.util.Map;
 import java.util.logging.Level;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
@@ -35,11 +34,7 @@
 
     @Override
     protected void doRun() throws Exception {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        NodeControllerState state = nodeMap.get(nodeId);
-        if (state != null) {
-            state.notifyHeartbeat(hbData);
-        }
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index c4c8873..e400fa8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -47,6 +48,7 @@
         /** triggered remotely by a NC to notify that the NC is deployed */
         DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
         dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..77cddf3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -50,6 +51,7 @@
                 jobletProfile.getTaskProfiles().put(taId, statistics);
             }
             run.getScheduler().notifyTaskComplete(ta);
+            HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..3578fd62 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
     private final List<Exception> exceptions;
@@ -38,6 +39,7 @@
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
new file mode 100644
index 0000000..51bbc16
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.control.cc.work.utils;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+
+public class HeartbeatUtils {
+
+    public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+        NodeControllerState state = nodeMap.get(nodeId);
+        if (state != null) {
+            state.notifyHeartbeat(hbData);
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..5a6d849 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -55,6 +55,7 @@
             } catch (InterruptedException e) {
                 LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
                 // There isn't much we can do really here
+                break; // the interrupt was explicit from another thread. This thread should shut down...
             }
         }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
similarity index 69%
rename from hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
rename to hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
index 44f3bec..cbdc6e5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
@@ -12,15 +12,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.control.common.utils;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * @author yingyib
  */
-public class ExceptionFilterUtils {
+public class ExceptionUtils {
 
     public static List<Exception> getActualExceptions(List<Exception> allExceptions) {
         List<Exception> exceptions = new ArrayList<Exception>();
@@ -32,6 +35,17 @@
         return exceptions;
     }
 
+    public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
+        List<Exception> newExceptions = new ArrayList<Exception>();
+        for (Exception e : exceptions) {
+            HyracksDataException newException = new HyracksDataException(e);
+            newException.setNodeId(nodeId);
+            newExceptions.add(newException);
+        }
+        exceptions.clear();
+        exceptions.addAll(newExceptions);
+    }
+
     private static boolean possibleRootCause(Throwable exception) {
         Throwable cause = exception;
         while ((cause = cause.getCause()) != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f12c981..58e12cf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -87,6 +87,7 @@
     private class WorkerThread extends Thread {
         WorkerThread() {
             setDaemon(true);
+            setPriority(MAX_PRIORITY);
         }
 
         @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6049a3b..b7fcf7f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
 
         heartbeatTask = new HeartbeatTask(ccs);
 
-        // Schedule heartbeat generator.
+        // Schedule heartbeat generator. 
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
         if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..09eeab6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -49,6 +49,7 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
@@ -243,6 +244,7 @@
                                 addPendingThread(thread);
                                 String oldName = thread.getName();
                                 thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
                                 try {
                                     pushFrames(collector, writer);
                                 } catch (HyracksDataException e) {
@@ -277,6 +279,7 @@
         }
         if (!exceptions.isEmpty()) {
             NodeControllerService ncs = joblet.getNodeController();
+            ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
         }
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 30b2482..df4d296 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -38,4 +38,9 @@
         sourceBitmap.clear();
         sourceBitmap.set(0, nProducerPartitions);
     }
+    
+    @Override
+    public boolean allProducersToAllConsumers(){
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 466fead..20a0ed1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -82,4 +82,9 @@
         sourceBitmap.clear();
         sourceBitmap.set(consumerIndex);
     }
+
+    @Override
+    public boolean allProducersToAllConsumers() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 75553e1..85f80ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
 
 /**
@@ -54,7 +55,20 @@
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
     }
-    
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @param topology
+     *            the hyracks cluster toplogy
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
+    }
+
     /**
      * The constructor of the scheduler.
      * 
@@ -62,7 +76,8 @@
      *            the mapping from nc names to nc infos
      * @throws HyracksException
      */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder) throws HyracksException {
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
+            throws HyracksException {
         scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
     }
 
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index a85a174..c14f23a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -317,7 +317,11 @@
 
                 if (!releasedLatches) {
                     for (int i = 0; i < nodeFrontiers.size(); i++) {
-                        nodeFrontiers.get(i).page.releaseWriteLatch();
+                        try {
+                            nodeFrontiers.get(i).page.releaseWriteLatch();
+                        } catch (Exception e) {
+                            //ignore illegal monitor state exception
+                        }
                         bufferCache.unpin(nodeFrontiers.get(i).page);
                     }
                 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 1e0d87a..a28d059 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -76,6 +76,8 @@
     public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
     /** the check point hook */
     public static final String CKP_CLASS = "pregelix.checkpointHook";
+    /** the check point hook */
+    public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
 
     /**
      * Construct a Pregelix job from an existing configuration
@@ -222,6 +224,15 @@
     final public void setCheckpointHook(Class<?> ckpClass) {
         getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
     }
+    
+    /**
+     * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+     * 
+     * @param ckpClass
+     */
+    final public void setRecoveryCount(int recoveryCount) {
+        getConfiguration().setInt(RECOVERY_COUNT, recoveryCount);
+    }
 
     @Override
     public String toString() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..e3950c8 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,47 @@
     public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
         return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
     }
+
+    /**
+     * Get the path for vertex checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for vertex checkpointing
+     */
+    public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+    }
+
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+        String path = "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+        return path;
+    }
+
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
+    }
+
+    /***
+     * Get the recovery count
+     * 
+     * @return recovery count
+     */
+    public static int getRecoveryCount(Configuration conf) {
+        return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 6a4a660..3455233 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -25,7 +25,7 @@
 
     @Override
     public boolean checkpoint(int superstep) {
-        if (superstep % 5 == 0) {
+        if (superstep % 100 == 0) {
             return true;
         } else {
             return false;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..5530144 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -23,7 +23,9 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +52,7 @@
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
@@ -89,6 +92,7 @@
             this.profiling = profiling;
             PregelixJob currentJob = jobs.get(0);
             PregelixJob lastJob = currentJob;
+            addHadoopConfiguration(currentJob, ipAddress, port, true);
             JobGen jobGen = null;
 
             /** prepare job -- deploy jars */
@@ -99,27 +103,29 @@
             IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 1;
+            int maxRetryCount = 3;
+            jobGen = selectJobGen(planChoice, currentJob);
 
             do {
                 try {
                     for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
                         lastJob = currentJob;
                         currentJob = jobs.get(i);
+                        currentJob.setRecoveryCount(retryCount);
 
                         /** add hadoop configurations */
-                        addHadoopConfiguration(currentJob, ipAddress, port);
+                        addHadoopConfiguration(currentJob, ipAddress, port, failed);
                         ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
 
                         /** load the data */
-                        if (i == 0 || compatible(lastJob, currentJob)) {
+                        if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
                             if (i != 0) {
                                 finishJobs(jobGen, deploymentId);
                                 /** invalidate/clear checkpoint */
                                 lastSnapshotJobIndex.set(0);
                                 lastSnapshotSuperstep.set(0);
                             }
-                            jobGen = selectJobGen(planChoice, currentJob);
+                            jobGen.reset(currentJob);
                             loadData(currentJob, jobGen, deploymentId);
                         } else {
                             jobGen.reset(currentJob);
@@ -137,12 +143,16 @@
                     /** clear checkpoints if any */
                     jobGen.clearCheckpoints();
                     hcc.unDeployBinary(deploymentId);
-                } catch (IOException ioe) {
-                    /** disk failures */
-                    //restart from snapshot
-                    failed = true;
-                    retryCount++;
-                    throw new HyracksException(ioe);
+                } catch (Exception e1) {
+                    Set<String> blackListNodes = new HashSet<String>();
+                    /** disk failures or node failures */
+                    if (ExceptionUtilities.recoverable(e1, blackListNodes)) {
+                        ClusterConfig.addToBlackListNodes(blackListNodes);
+                        failed = true;
+                        retryCount++;
+                    } else {
+                        throw e1;
+                    }
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
@@ -222,9 +232,9 @@
     }
 
     private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
-        if (hcc == null)
+        if (hcc == null) {
             hcc = new HyracksConnection(ipAddress, port);
-
+        }
         URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
         List<File> jars = new ArrayList<File>();
         URL[] urls = classLoader.getURLs();
@@ -235,7 +245,8 @@
         return deploymentId;
     }
 
-    private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+    private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port, boolean loadClusterConfig)
+            throws HyracksException {
         URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
         if (hadoopCore != null) {
             job.getConfiguration().addResource(hadoopCore);
@@ -248,7 +259,9 @@
         if (hadoopHdfs != null) {
             job.getConfiguration().addResource(hadoopHdfs);
         }
-        ClusterConfig.loadClusterConfig(ipAddress, port);
+        if (loadClusterConfig) {
+            ClusterConfig.loadClusterConfig(ipAddress, port);
+        }
     }
 
     private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
@@ -256,8 +269,13 @@
             throws Exception {
         if (doRecovery) {
             /** reload the checkpoint */
-            runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
-
+            if (snapshotSuperstep.get() > 0) {
+                runClearState(deploymentId, jobGen);
+                runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+            } else {
+                runClearState(deploymentId, jobGen);
+                loadData(job, jobGen, deploymentId);
+            }
         }
         int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
         boolean terminate = false;
@@ -272,8 +290,8 @@
                     || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
             if (ckpHook.checkpoint(i)) {
                 runCheckpoint(deploymentId, jobGen, i);
-                snapshotSuperstep.set(i);
                 snapshotJobIndex.set(currentJobIndex);
+                snapshotSuperstep.set(i);
             }
             i++;
         } while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..4863378 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -36,9 +36,12 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,10 +64,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -102,10 +102,10 @@
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -137,8 +137,6 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    private String vertexCheckpointPath;
-
     public JobGen(PregelixJob job) {
         init(job);
     }
@@ -148,10 +146,7 @@
         pregelixJob = job;
         initJobConfiguration();
         job.setJobId(jobId);
-
-        vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
-        // set the frame size to be the one user specified if the user did
-        // specify.
+        // set the frame size to be the one user specified if the user did specify.
         int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
         if (specifiedFrameSize > 0) {
             frameSize = specifiedFrameSize;
@@ -376,15 +371,21 @@
     @Override
     public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         try {
-
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
             tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
-            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+            FileOutputFormat.setOutputPath(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
             tmpJob.setOutputKeyClass(NullWritable.class);
             tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+            FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+
+            dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+
+            dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
             JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+
             specs[0] = vertexCkpSpec;
             for (int i = 1; i < specs.length; i++) {
                 specs[i] = stateCkpSpecs[i - 1];
@@ -397,7 +398,7 @@
 
     @Override
     public JobSpecification generateLoadingJob() throws HyracksException {
-        JobSpecification spec = loadHDFSData(conf);
+        JobSpecification spec = loadHDFSData(pregelixJob);
         return spec;
     }
 
@@ -412,13 +413,22 @@
         try {
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
-            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
-            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+            FileInputFormat.setInputPaths(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+            JobSpecification[] cleanVertices = generateCleanup();
+            JobSpecification createIndex = generateCreatingJob();
+            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob);
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
-            JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
-            specs[0] = vertexLoadSpec;
-            for (int i = 1; i < specs.length; i++) {
-                specs[i] = stateLoadSpecs[i - 1];
+            JobSpecification[] specs = new JobSpecification[cleanVertices.length + 2 + stateLoadSpecs.length];
+
+            int i = 0;
+            for (; i < cleanVertices.length; i++) {
+                specs[i] = cleanVertices[i];
+            }
+            specs[i++] = createIndex;
+            specs[i++] = vertexLoadSpec;
+            for (; i < specs.length; i++) {
+                specs[i] = stateLoadSpecs[i - cleanVertices.length - 2];
             }
             return specs;
         } catch (Exception e) {
@@ -479,7 +489,8 @@
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+    private JobSpecification loadHDFSData(PregelixJob job) throws HyracksException, HyracksDataException {
+        Configuration conf = job.getConfiguration();
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
@@ -492,7 +503,7 @@
         VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+            splits = inputFormat.getSplits(job, fileSplitProvider.getFileSplits().length);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -591,7 +602,8 @@
          */
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
-        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+        VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
+                inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
         /**
@@ -637,18 +649,21 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                false);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
-        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+        tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
         tmpJob.setOutputKeyClass(vertexIdClass);
         tmpJob.setOutputValueClass(MsgList.class);
 
-        ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
-        HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName());
+        HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
+        ClusterConfig.setLocationConstraint(spec, hdfsWriter);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -660,25 +675,26 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
-        tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+        tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
         try {
             FileInputFormat.setInputPaths(tmpJob, checkpointPath);
         } catch (IOException e) {
             throw new HyracksException(e);
         }
-        Configuration conf = job.getConfiguration();
+        Configuration conf = tmpJob.getConfiguration();
         Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
 
         /***
          * HDFS read operator
          */
-        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+            InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+                    job.getConfiguration());
+            splits = inputFormat.getSplits(tmpJob);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -692,6 +708,16 @@
                 readSchedule, new KeyValueParserFactory());
         ClusterConfig.setLocationConstraint(spec, scanner);
 
+        /** construct the sort operator to sort message states */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
+                WritableComparator.get(vertexIdClass).getClass());
+        ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+                nkmFactory, sortCmpFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sort);
+
         /**
          * construct the materializing write operator
          */
@@ -701,7 +727,7 @@
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+                new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
                         pregelixJob.getConfiguration())));
         ClusterConfig.setLocationConstraint(spec, postSuperStep);
 
@@ -714,7 +740,8 @@
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
-                materialize, 0);
+                sort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
         spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 41887c0..9389f62 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -21,15 +21,17 @@
 import java.util.List;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -49,17 +51,12 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.io.VertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -71,8 +68,8 @@
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -344,7 +341,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -525,14 +523,10 @@
     /** generate plan specific state checkpointing */
     protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
-        PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
-        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
 
         /** generate secondary index checkpoint */
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
-        FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
-        tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
-        tmpJob.setOutputValueClass(MsgList.class);
+        PregelixJob tmpJob = this.createCloneJob("Secondary index checkpointing for job " + jobId, pregelixJob);
+
         JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
 
         JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
@@ -549,11 +543,12 @@
     @Override
     protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
             throws HyracksException {
-        JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
-        PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
-        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+        /** generate message checkpoint load */
+        JobSpecification[] msgCkpSpecs = super.generateStateCheckpointLoading(lastSuccessfulIteration, job);
 
         /** generate secondary index checkpoint load */
+        PregelixJob tmpJob = this.createCloneJob("Secondary index checkpoint loading for job " + jobId, pregelixJob);
+        tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
         JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
         JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
         for (int i = 0; i < msgCkpSpecs.length; i++) {
@@ -569,23 +564,23 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         JobSpecification spec = new JobSpecification();
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
-        tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+        tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
         try {
             FileInputFormat.setInputPaths(tmpJob, checkpointPath);
         } catch (IOException e) {
             throw new HyracksException(e);
         }
-        Configuration conf = job.getConfiguration();
 
         /***
-         * construct HDFS read operator
+         * HDFS read operator
          */
-        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+            InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+                    job.getConfiguration());
+            splits = inputFormat.getSplits(tmpJob);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -599,6 +594,16 @@
                 readSchedule, new KeyValueParserFactory());
         ClusterConfig.setLocationConstraint(spec, scanner);
 
+        /** construct the sort operator to sort message states */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
+                WritableComparator.get(vertexIdClass).getClass());
+        ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+                nkmFactory, sortCmpFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sort);
+
         /**
          * construct bulk-load index operator
          */
@@ -621,8 +626,8 @@
          * connect operator descriptors
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
-                btreeBulkLoad, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, btreeBulkLoad, 0);
         spec.setFrameSize(frameSize);
 
         return spec;
@@ -631,6 +636,12 @@
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
             throws HyracksException {
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
+        FileOutputFormat.setOutputPath(job, new Path(checkpointPath));
+        job.setOutputKeyClass(BspUtils.getVertexIndexClass(job.getConfiguration()));
+        job.setOutputValueClass(MsgList.class);
+
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         Class<? extends Writable> msgListClass = MsgList.class;
         String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
@@ -654,7 +665,6 @@
         /**
          * construct btree search operator
          */
-        ConfFactory confFactory = new ConfFactory(job);
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), msgListClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -674,8 +684,9 @@
         /**
          * construct write file operator
          */
-        HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
-                confFactory));
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName());
+        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index c29ea18..287b797 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -299,7 +299,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index dc61971..3b3c9e7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -288,7 +288,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 34f723f..e334095 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -302,7 +302,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index ea6cc8a..89fbdcd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -20,12 +20,15 @@
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,6 +55,8 @@
     private static Map<String, List<String>> ipToNcMapping;
     private static String[] stores;
     private static Scheduler hdfsScheduler;
+    private static Set<String> blackListNodes = new HashSet<String>();
+    private static IHyracksClientConnection hcc;
 
     /**
      * let tests set config path to be whatever
@@ -197,9 +202,19 @@
 
     public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
         try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            if (hcc == null) {
+                hcc = new HyracksConnection(ipAddress, port);
+            }
             Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
             ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
+
+            /**
+             * remove black list nodes -- which had disk failures
+             */
+            for (String blackListNode : blackListNodes) {
+                ncNameToNcInfos.remove(blackListNode);
+            }
+
             NCs = new String[ncNameToNcInfos.size()];
             ipToNcMapping = new HashMap<String, List<String>>();
             int i = 0;
@@ -216,7 +231,7 @@
                 i++;
             }
 
-            hdfsScheduler = new Scheduler(ipAddress, port);
+            hdfsScheduler = new Scheduler(hcc.getNodeControllerInfos(), hcc.getClusterTopology());
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -240,4 +255,8 @@
         }
         return locations;
     }
+
+    public static void addToBlackListNodes(Collection<String> nodes) {
+        blackListNodes.addAll(nodes);
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
new file mode 100644
index 0000000..a4c4501
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.IOException;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The util to analysis exceptions
+ * 
+ * @author yingyib
+ */
+public class ExceptionUtilities {
+
+    /**
+     * Check whether a exception is recoverable or not
+     * 
+     * @param exception
+     * @return true or false
+     */
+    public static boolean recoverable(Exception exception, Set<String> blackListNodes) {
+        String message = exception.getMessage();
+
+        /***
+         * check interrupted exception
+         */
+        if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
+                || message.contains("Failure occurred on input")) {
+            return true;
+        }
+        Throwable cause = exception;
+        while ((cause = cause.getCause()) != null) {
+            if (cause instanceof InterruptedException) {
+                return true;
+            }
+        }
+
+        /***
+         * check io exception
+         */
+        cause = exception;
+        String blackListNode = null;
+        if (cause instanceof HyracksDataException) {
+            blackListNode = ((HyracksDataException) cause).getNodeId();
+        }
+        while ((cause = cause.getCause()) != null) {
+            if (cause instanceof IOException) {
+                if (containsIOManager(cause)) {
+                    if (blackListNode != null) {
+                        blackListNodes.add(blackListNode);
+                    }
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Check if the exception traces contains the IOManager, which means there are disk failures
+     * 
+     * @param cause
+     * @return true if IOManager is in the trace; false otherwise.
+     */
+    private static boolean containsIOManager(Throwable cause) {
+        StackTraceElement[] traces = cause.getStackTrace();
+        for (StackTraceElement e : traces) {
+            if (e.getClassName().endsWith("IOManager")) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e1795de..98be8cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,8 +61,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
-        //ccConfig.heartbeatPeriod = 5000;
-        //ccConfig.maxHeartbeatLapsePeriods = 1;
+        ccConfig.heartbeatPeriod = 1000;
+        ccConfig.maxHeartbeatLapsePeriods = 15;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -98,14 +98,22 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
-    public static void showDownNC1() throws Exception {
+    public static void startNC1() throws Exception {
+        nc1.start();
+    }
+    
+    public static void shutdownNC1() throws Exception {
         nc1.stop();
     }
 
-    public static void showDownNC2() throws Exception {
+    public static void shutdownNC2() throws Exception {
         nc2.stop();
     }
 
+    public static void shutdownCC() throws Exception {
+        cc.stop();
+    }
+
     public static void deinit() throws Exception {
         nc2.stop();
         nc1.stop();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7221cb5..b22e468 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -230,8 +230,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index b21cd2a..0ecfd03 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -254,8 +254,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index dd6ee3c..e64e9cc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -245,6 +245,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 3cebfb8..a9c787f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -219,8 +219,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index bbe2764..86a211f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -205,6 +205,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index c4890e1..c985f64 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -100,6 +100,11 @@
 
     @Override
     public void fail() throws HyracksDataException {
-
+        try {
+            bulkLoader.end();
+        } catch (IndexException e) {
+            treeIndexOpHelper.close();
+            throw new HyracksDataException(e);
+        } 
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index bd85e3e..de87909 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -224,8 +224,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..a1177c8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -19,42 +19,45 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 
 public class HDFSFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
-    private final IConfigurationFactory confFactory;
+    private final ConfFactory confFactory;
     private final IRecordDescriptorFactory inputRdFactory;
 
-    public HDFSFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
-            IRecordDescriptorFactory inputRdFactory) {
+    public HDFSFileWriteOperatorDescriptor(JobSpecification spec, Job conf, IRecordDescriptorFactory inputRdFactory)
+            throws HyracksException {
         super(spec, 1, 0);
-        this.confFactory = confFactory;
-        this.inputRdFactory = inputRdFactory;
+        try {
+            this.confFactory = new ConfFactory(conf);
+            this.inputRdFactory = inputRdFactory;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
     }
 
     @SuppressWarnings("rawtypes")
@@ -65,12 +68,12 @@
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private RecordDescriptor rd0;
             private FrameDeserializer frameDeserializer;
-            private Configuration conf;
-            private VertexWriter vertexWriter;
+            private Job job;
+            private RecordWriter recordWriter;
             private TaskAttemptContext context;
+            private ContextFactory ctxFactory = new ContextFactory();
             private String TEMP_DIR = "_temporary";
             private ClassLoader ctxCL;
-            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void open() throws HyracksDataException {
@@ -79,16 +82,16 @@
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
                 ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                conf = confFactory.createConfiguration(ctx);
-
-                VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
-                context = ctxFactory.createContext(conf, partition);
-                context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+                job = confFactory.getConf();
                 try {
-                    vertexWriter = outputFormat.createVertexWriter(context);
+                    OutputFormat outputFormat = ReflectionUtils.newInstance(job.getOutputFormatClass(),
+                            job.getConfiguration());
+                    context = ctxFactory.createContext(job.getConfiguration(), partition);
+                    context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+                    recordWriter = outputFormat.getRecordWriter(context);
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
             }
@@ -100,8 +103,9 @@
                 try {
                     while (!frameDeserializer.done()) {
                         Object[] tuple = frameDeserializer.deserializeRecord();
-                        Vertex value = (Vertex) tuple[1];
-                        vertexWriter.writeVertex(value);
+                        Object key = tuple[0];
+                        Object value = tuple[1];
+                        recordWriter.write(key, value);
                     }
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
@@ -118,7 +122,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
-                    vertexWriter.close(context);
+                    recordWriter.close(context);
                     moveFilesToFinalPath();
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
@@ -129,9 +133,8 @@
 
             private void moveFilesToFinalPath() throws HyracksDataException {
                 try {
-                    JobContext job = ctxFactory.createJobContext(conf);
                     Path outputPath = FileOutputFormat.getOutputPath(job);
-                    FileSystem dfs = FileSystem.get(conf);
+                    FileSystem dfs = FileSystem.get(job.getConfiguration());
                     Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
                     FileStatus[] results = findPartitionPaths(outputPath, dfs);
                     if (results.length >= 1) {
@@ -161,7 +164,7 @@
                 FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
                     @Override
                     public boolean accept(Path dir) {
-                        return dir.getName().endsWith(TEMP_DIR);
+                        return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
                     }
                 });
                 Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index ca8f190..b44b643 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -30,9 +30,12 @@
 
 public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
+    private final boolean removeIterationState;
 
-    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
+            boolean removeIterationState) {
         super(spec, 1, 1);
+        this.removeIterationState = removeIterationState;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -73,7 +76,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                writer.fail();
             }
 
             @Override
@@ -81,7 +84,9 @@
                 /**
                  * remove last iteration's state
                  */
-                IterationUtils.removeIterationState(ctx, partition);
+                if (removeIterationState) {
+                    IterationUtils.removeIterationState(ctx, partition);
+                }
                 writer.close();
                 complete = true;
             }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..f3ec40e
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+            IRecordDescriptorFactory inputRdFactory) {
+        super(spec, 1, 0);
+        this.confFactory = confFactory;
+        this.inputRdFactory = inputRdFactory;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private Configuration conf;
+            private VertexWriter vertexWriter;
+            private TaskAttemptContext context;
+            private String TEMP_DIR = "_temporary";
+            private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor(ctx);
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                conf = confFactory.createConfiguration(ctx);
+
+                VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
+                context = ctxFactory.createContext(conf, partition);
+                context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+                try {
+                    vertexWriter = outputFormat.createVertexWriter(context);
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                try {
+                    while (!frameDeserializer.done()) {
+                        Object[] tuple = frameDeserializer.deserializeRecord();
+                        Vertex value = (Vertex) tuple[1];
+                        vertexWriter.writeVertex(value);
+                    }
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                Thread.currentThread().setContextClassLoader(ctxCL);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    vertexWriter.close(context);
+                    moveFilesToFinalPath();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            private void moveFilesToFinalPath() throws HyracksDataException {
+                try {
+                    JobContext job = ctxFactory.createJobContext(conf);
+                    Path outputPath = FileOutputFormat.getOutputPath(job);
+                    FileSystem dfs = FileSystem.get(conf);
+                    Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
+                    FileStatus[] results = findPartitionPaths(outputPath, dfs);
+                    if (results.length >= 1) {
+                        /**
+                         * for Hadoop-0.20.2
+                         */
+                        renameFile(dfs, filePath, results);
+                    } else {
+                        /**
+                         * for Hadoop-0.23.1
+                         */
+                        int jobId = job.getJobID().getId();
+                        outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+                                + jobId);
+                        results = findPartitionPaths(outputPath, dfs);
+                        renameFile(dfs, filePath, results);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
+                }
+            }
+
+            private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+                    IOException {
+                FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
+                    }
+                });
+                Path tempDir = tempPaths[0].getPath();
+                FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+                                && dir.getName().indexOf(".crc") < 0;
+                    }
+                });
+                return results;
+            }
+
+            private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+                    HyracksDataException, FileNotFoundException {
+                Path srcDir = results[0].getPath();
+                if (!dfs.exists(srcDir))
+                    throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+                FileStatus[] srcFiles = dfs.listStatus(srcDir);
+                Path srcFile = srcFiles[0].getPath();
+                dfs.delete(filePath, true);
+                dfs.rename(srcFile, filePath);
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index bfe89ab..f3f7513 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -147,11 +147,15 @@
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
         Boolean toMove = jobIdToMove.get(jobId);
         if (toMove == null || toMove == true) {
             if (jobIdToSuperStep.get(jobId) == null) {
-                jobIdToSuperStep.put(jobId, 0L);
+                if (currentIteration <= 0) {
+                    jobIdToSuperStep.put(jobId, 0L);
+                } else {
+                    jobIdToSuperStep.put(jobId, currentIteration);
+                }
             }
 
             long superStep = jobIdToSuperStep.get(jobId);
@@ -175,6 +179,35 @@
         System.gc();
     }
 
+    public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
+            long currentIteration) {
+        if (jobIdToSuperStep.get(jobId) == null) {
+            if (currentIteration <= 0) {
+                jobIdToSuperStep.put(jobId, 0L);
+            } else {
+                jobIdToSuperStep.put(jobId, currentIteration);
+            }
+        }
+
+        long superStep = jobIdToSuperStep.get(jobId);
+        List<FileReference> files = iterationToFiles.remove(superStep - 1);
+        if (files != null) {
+            for (FileReference fileRef : files)
+                fileRef.delete();
+        }
+
+        if (currentIteration > 0) {
+            Vertex.setSuperstep(currentIteration);
+        } else {
+            Vertex.setSuperstep(++superStep);
+        }
+        Vertex.setNumVertices(numVertices);
+        Vertex.setNumEdges(numEdges);
+        jobIdToSuperStep.put(jobId, superStep);
+        jobIdToMove.put(jobId, true);
+        LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+    }
+
     public synchronized void endSuperStep(String pregelixJobId) {
         jobIdToMove.put(pregelixJobId, true);
         LOGGER.info("end iteration " + Vertex.getSuperstep());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1cf81ac..02097bf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,13 +75,21 @@
         context.endSuperStep(giraphJobId);
     }
 
-    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
+    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
         context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
                 conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
     }
 
+    public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+            long currentIteration) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+    }
+
     public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
             throws HyracksDataException {
         try {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
new file mode 100644
index 0000000..efc7bcc
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryConnectedComponentsTest {
+    private static String INPUTPATH = "data/webmapcomplex";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex2";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+        try {
+            PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+            job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+            job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+            job.setDynamicVertexValueSize(true);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            while (Vertex.getSuperstep() <= 5) {
+                                this.wait(200);
+                            }
+                            PregelixHyracksIntegrationUtil.shutdownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.shutdownNC2();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
new file mode 100644
index 0000000..ff1e29f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryInnerJoinTest {
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            while (Vertex.getSuperstep() <= 5) {
+                                this.wait(200);
+                            }
+                            PregelixHyracksIntegrationUtil.shutdownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            driver.runJob(job, Plan.INNER_JOIN, "127.0.0.1",
+                    PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.shutdownNC2();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index dac087f..3fdaf15 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.junit.Test;
 
+import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
@@ -37,7 +38,7 @@
 public class FailureRecoveryTest {
     private static String INPUTPATH = "data/webmap";
     private static String OUTPUTPAH = "actual/result";
-    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
 
     @Test
     public void test() throws Exception {
@@ -57,29 +58,30 @@
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
-            //            Thread thread = new Thread(new Runnable() {
-            //
-            //                @Override
-            //                public void run() {
-            //                    try {
-            //                        synchronized (this) {
-            //                            this.wait(10000);
-            //                            PregelixHyracksIntegrationUtil.showDownNC1();
-            //                        }
-            //                    } catch (Exception e) {
-            //                        throw new IllegalStateException(e);
-            //                    }
-            //                }
-            //
-            //            });
-            //thread.start();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            while (Vertex.getSuperstep() <= 5) {
+                                this.wait(200);
+                            }
+                            PregelixHyracksIntegrationUtil.shutdownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
             driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
 
             TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
         } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.shutdownNC2();
+            testCluster.cleanupHDFS();
             throw e;
-        } finally {
-            testCluster.tearDown();
         }
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
new file mode 100644
index 0000000..e006ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryWithoutCheckpointTest {
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            while (Vertex.getSuperstep() <= 5) {
+                                this.wait(200);
+                            }
+                            PregelixHyracksIntegrationUtil.shutdownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.shutdownNC2();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
similarity index 100%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+            job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+            job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index 40ea690..660d9eb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -126,7 +126,7 @@
     /**
      * cleanup hdfs cluster
      */
-    private void cleanupHDFS() throws Exception {
+    public void cleanupHDFS() throws Exception {
         dfsCluster.shutdown();
     }
 
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
new file mode 100755
index 0000000..2c975de
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
@@ -0,0 +1,10 @@
+0	0
+2	0
+4	0
+6	0
+8	0
+10	0
+12	0
+14	0
+16	0
+18	0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
new file mode 100755
index 0000000..6976bc1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
@@ -0,0 +1,13 @@
+1	0
+3	0
+5	0
+7	0
+9	0
+11	0
+13	0
+15	0
+17	0
+19	0
+21	21
+25	25
+27	27
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
new file mode 100755
index 0000000..d135b86
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
@@ -0,0 +1,10 @@
+0	0.008290140026154316
+2	0.14646839195826472
+4	0.03976979906329426
+6	0.015736276824953852
+8	0.010628239626209894
+10	0.008290140026154316
+12	0.14646839195826472
+14	0.03976979906329426
+16	0.015736276824953852
+18	0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
new file mode 100755
index 0000000..d3badee
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
@@ -0,0 +1,10 @@
+1	0.15351528192471647
+3	0.08125113985998214
+5	0.0225041581462058
+7	0.012542224114863661
+9	0.009294348455354817
+11	0.15351528192471647
+13	0.08125113985998214
+15	0.0225041581462058
+17	0.012542224114863661
+19	0.009294348455354817
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
index 35e7cd8..4720272 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -30,11 +30,11 @@
  */
 public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
     private static final long serialVersionUID = 1L;
-    private final int currentSuperStep;
+    private final long currentSuperStep;
     private String jobId;
     private IConfigurationFactory confFactory;
 
-    public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+    public RecoveryRuntimeHookFactory(String jobId, long currentSuperStep, IConfigurationFactory confFactory) {
         this.currentSuperStep = currentSuperStep;
         this.jobId = jobId;
         this.confFactory = confFactory;
@@ -48,7 +48,7 @@
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 IterationUtils.endSuperStep(jobId, ctx);
                 Configuration conf = confFactory.createConfiguration(ctx);
-                IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+                IterationUtils.recoverProperties(jobId, ctx, conf, currentSuperStep);
             }
 
         };