Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 0b8346b..fd6360a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -27,6 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
@@ -45,6 +48,7 @@
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -458,13 +462,14 @@
     private void abortJob(List<Exception> exceptions) {
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
-            abortTaskCluster(findLastTaskClusterAttempt(tc));
+            abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
         assert inProgressTaskClusters.isEmpty();
         ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
     }
 
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt,
+            TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
         LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
@@ -477,11 +482,13 @@
                 ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
                 ta.setEndTime(System.currentTimeMillis());
                 List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
-                if (abortTaskAttempts == null) {
+                if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
                     abortTaskAttempts = new ArrayList<TaskAttemptId>();
                     abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
                 }
-                abortTaskAttempts.add(taId);
+                if (status == TaskAttempt.TaskStatus.RUNNING) {
+                    abortTaskAttempts.add(taId);
+                }
             }
         }
         final JobId jobId = jobRun.getJobId();
@@ -505,6 +512,9 @@
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
         pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+
+        tcAttempt.setStatus(failedOrAbortedStatus);
+        tcAttempt.setEndTime(System.currentTimeMillis());
     }
 
     private void abortDoomedTaskClusters() throws HyracksException {
@@ -519,9 +529,7 @@
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
             if (tca != null) {
-                abortTaskCluster(tca);
-                tca.setEndTime(System.currentTimeMillis());
-                tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
             }
         }
     }
@@ -608,9 +616,7 @@
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
                 LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
-                abortTaskCluster(lastAttempt);
-                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                lastAttempt.setEndTime(System.currentTimeMillis());
+                abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
                 if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
                     abortJob(exceptions);
@@ -635,32 +641,41 @@
     public void notifyNodeFailures(Set<String> deadNodes) {
         try {
             jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+            jobRun.getParticipatingNodeIds().removeAll(deadNodes);
+            jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
+            if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
+                finishJob(jobRun);
+                return;
+            }
             for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
-                TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
-                if (taskClusters != null) {
-                    for (TaskCluster tc : taskClusters) {
-                        TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
-                        if (lastTaskClusterAttempt != null
-                                && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
-                                        .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                            boolean abort = false;
-                            for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
-                                assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
-                                if (deadNodes.contains(ta.getNodeId())) {
-                                    ta.setStatus(
-                                            TaskAttempt.TaskStatus.FAILED,
-                                            Collections.singletonList(new Exception("Node " + ta.getNodeId()
-                                                    + " failed")));
-                                    ta.setEndTime(System.currentTimeMillis());
-                                    abort = true;
+                if (isPlanned(ac)) {
+                    TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+                    if (taskClusters != null) {
+                        for (TaskCluster tc : taskClusters) {
+                            TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+                            if (lastTaskClusterAttempt != null
+                                    && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+                                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                                boolean abort = false;
+                                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+                                    assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
+                                    if (deadNodes.contains(ta.getNodeId())) {
+                                        ta.setStatus(
+                                                TaskAttempt.TaskStatus.FAILED,
+                                                Collections.singletonList(new Exception("Node " + ta.getNodeId()
+                                                        + " failed")));
+                                        ta.setEndTime(System.currentTimeMillis());
+                                        abort = true;
+                                    }
+                                }
+                                if (abort) {
+                                    abortTaskCluster(lastTaskClusterAttempt,
+                                            TaskClusterAttempt.TaskClusterStatus.ABORTED);
                                 }
                             }
-                            if (abort) {
-                                abortTaskCluster(lastTaskClusterAttempt);
-                            }
                         }
+                        abortDoomedTaskClusters();
                     }
-                    abortDoomedTaskClusters();
                 }
             }
             startRunnableActivityClusters();
@@ -668,4 +683,37 @@
             abortJob(Collections.singletonList(e));
         }
     }
+
+    private void finishJob(final JobRun run) {
+        JobId jobId = run.getJobId();
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
+            try {
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                e.printStackTrace();
+            }
+        }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        ccs.getActiveRunMap().remove(jobId);
+        ccs.getRunMapArchive().put(jobId, run);
+        ccs.getRunHistory().put(jobId, run.getExceptions());
+        try {
+            ccs.getJobLogFile().log(createJobLogObject(run));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private JSONObject createJobLogObject(final JobRun run) {
+        JSONObject jobLogObject = new JSONObject();
+        try {
+            ActivityClusterGraph acg = run.getActivityClusterGraph();
+            jobLogObject.put("activity-cluster-graph", acg.toJSON());
+            jobLogObject.put("job-run", run.toJSON());
+        } catch (JSONException e) {
+            throw new RuntimeException(e);
+        }
+        return jobLogObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 6e8ddf0..22c3348 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.logging.Logger;
@@ -48,11 +49,16 @@
 
     @Override
     public void run() {
+        LOGGER.warning("Cleanup for JobRun with id: " + jobId);
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         if (run == null) {
             LOGGER.warning("Unable to find JobRun with id: " + jobId);
             return;
         }
+        if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+            finishJob(run);
+            return;
+        }
         if (run.getPendingStatus() != null) {
             LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
             return;
@@ -63,33 +69,47 @@
             run.setPendingStatus(status, exceptions);
         }
         if (targetNodes != null && !targetNodes.isEmpty()) {
+            Set<String> toDelete = new HashSet<String>();
             for (String n : targetNodes) {
                 NodeControllerState ncs = ccs.getNodeMap().get(n);
                 try {
-                    ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    if (ncs == null) {
+                        toDelete.add(n);
+                    } else {
+                        ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    }
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
+            targetNodes.removeAll(toDelete);
+            run.getCleanupPendingNodeIds().removeAll(toDelete);
+            if (run.getCleanupPendingNodeIds().isEmpty()) {
+                finishJob(run);
+            }
         } else {
-            CCApplicationContext appCtx = ccs.getApplicationContext();
-            if (appCtx != null) {
-                try {
-                    appCtx.notifyJobFinish(jobId);
-                } catch (HyracksException e) {
-                    e.printStackTrace();
-                }
-            }
-            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-            ccs.getActiveRunMap().remove(jobId);
-            ccs.getRunMapArchive().put(jobId, run);
-            ccs.getRunHistory().put(jobId, run.getExceptions());
+            finishJob(run);
+        }
+    }
+
+    private void finishJob(final JobRun run) {
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
             try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                e.printStackTrace();
             }
         }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        ccs.getActiveRunMap().remove(jobId);
+        ccs.getRunMapArchive().put(jobId, run);
+        ccs.getRunHistory().put(jobId, run.getExceptions());
+        try {
+            ccs.getJobLogFile().log(createJobLogObject(run));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     private JSONObject createJobLogObject(final JobRun run) {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..be419c2 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,38 @@
     public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
         return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
     }
+
+    /**
+     * Get the path for vertex checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for vertex checkpointing
+     */
+    public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+    }
+
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+        String path = "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+        return path;
+    }
+
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..61c3653 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
@@ -99,7 +100,8 @@
             IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 1;
+            int maxRetryCount = 3;
+            jobGen = selectJobGen(planChoice, currentJob);
 
             do {
                 try {
@@ -112,14 +114,14 @@
                         ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
 
                         /** load the data */
-                        if (i == 0 || compatible(lastJob, currentJob)) {
+                        if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
                             if (i != 0) {
                                 finishJobs(jobGen, deploymentId);
                                 /** invalidate/clear checkpoint */
                                 lastSnapshotJobIndex.set(0);
                                 lastSnapshotSuperstep.set(0);
                             }
-                            jobGen = selectJobGen(planChoice, currentJob);
+                            jobGen.reset(currentJob);
                             loadData(currentJob, jobGen, deploymentId);
                         } else {
                             jobGen.reset(currentJob);
@@ -137,12 +139,16 @@
                     /** clear checkpoints if any */
                     jobGen.clearCheckpoints();
                     hcc.unDeployBinary(deploymentId);
-                } catch (IOException ioe) {
+                } catch (Exception e1) {
                     /** disk failures */
                     //restart from snapshot
-                    failed = true;
-                    retryCount++;
-                    throw new HyracksException(ioe);
+                    /** node failures */
+                    if (ExceptionUtilities.recoverable(e1)) {
+                        failed = true;
+                        retryCount++;
+                    } else {
+                        throw e1;
+                    }
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
@@ -256,8 +262,13 @@
             throws Exception {
         if (doRecovery) {
             /** reload the checkpoint */
-            runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
-
+            if (snapshotSuperstep.get() > 0) {
+                runClearState(deploymentId, jobGen);
+                runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+            } else {
+                runClearState(deploymentId, jobGen);
+                loadData(job, jobGen, deploymentId);
+            }
         }
         int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
         boolean terminate = false;
@@ -272,8 +283,8 @@
                     || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
             if (ckpHook.checkpoint(i)) {
                 runCheckpoint(deploymentId, jobGen, i);
-                snapshotSuperstep.set(i);
                 snapshotJobIndex.set(currentJobIndex);
+                snapshotSuperstep.set(i);
             }
             i++;
         } while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..91d7e22 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -137,8 +137,6 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    private String vertexCheckpointPath;
-
     public JobGen(PregelixJob job) {
         init(job);
     }
@@ -148,10 +146,7 @@
         pregelixJob = job;
         initJobConfiguration();
         job.setJobId(jobId);
-
-        vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
-        // set the frame size to be the one user specified if the user did
-        // specify.
+        // set the frame size to be the one user specified if the user did specify.
         int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
         if (specifiedFrameSize > 0) {
             frameSize = specifiedFrameSize;
@@ -376,15 +371,20 @@
     @Override
     public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         try {
-
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
             tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
-            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+            FileOutputFormat.setOutputPath(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
             tmpJob.setOutputKeyClass(NullWritable.class);
             tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+            FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+            dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+
+            dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
             JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+
             specs[0] = vertexCkpSpec;
             for (int i = 1; i < specs.length; i++) {
                 specs[i] = stateCkpSpecs[i - 1];
@@ -397,7 +397,7 @@
 
     @Override
     public JobSpecification generateLoadingJob() throws HyracksException {
-        JobSpecification spec = loadHDFSData(conf);
+        JobSpecification spec = loadHDFSData(pregelixJob);
         return spec;
     }
 
@@ -412,13 +412,22 @@
         try {
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
-            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
-            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+            FileInputFormat.setInputPaths(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+            JobSpecification[] cleanVertices = generateCleanup();
+            JobSpecification createIndex = generateCreatingJob();
+            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob);
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
-            JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
-            specs[0] = vertexLoadSpec;
-            for (int i = 1; i < specs.length; i++) {
-                specs[i] = stateLoadSpecs[i - 1];
+            JobSpecification[] specs = new JobSpecification[cleanVertices.length + 2 + stateLoadSpecs.length];
+
+            int i = 0;
+            for (; i < cleanVertices.length; i++) {
+                specs[i] = cleanVertices[i];
+            }
+            specs[i++] = createIndex;
+            specs[i++] = vertexLoadSpec;
+            for (; i < specs.length; i++) {
+                specs[i] = stateLoadSpecs[i - cleanVertices.length - 2];
             }
             return specs;
         } catch (Exception e) {
@@ -479,7 +488,8 @@
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+    private JobSpecification loadHDFSData(PregelixJob job) throws HyracksException, HyracksDataException {
+        Configuration conf = job.getConfiguration();
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
@@ -492,7 +502,7 @@
         VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+            splits = inputFormat.getSplits(job, fileSplitProvider.getFileSplits().length);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -637,10 +647,11 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                false);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -660,7 +671,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
@@ -668,7 +679,7 @@
         } catch (IOException e) {
             throw new HyracksException(e);
         }
-        Configuration conf = job.getConfiguration();
+        Configuration conf = tmpJob.getConfiguration();
         Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 41887c0..7c1df0ff 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -344,7 +344,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -529,7 +530,7 @@
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
 
         /** generate secondary index checkpoint */
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
         tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
         tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +570,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         JobSpecification spec = new JobSpecification();
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index c29ea18..287b797 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -299,7 +299,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index dc61971..3b3c9e7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -288,7 +288,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 34f723f..e334095 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -302,7 +302,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
new file mode 100644
index 0000000..4b1bf94
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+/**
+ * The util to analysis exceptions
+ * 
+ * @author yingyib
+ */
+public class ExceptionUtilities {
+
+    /**
+     * Check whether a exception is recoverable or not
+     * 
+     * @param exception
+     * @return true or false
+     */
+    public static boolean recoverable(Exception exception) {
+        String message = exception.getMessage();
+        if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
+                || message.contains("Failure occurred on input")) {
+            return true;
+        }
+        Throwable cause = exception;
+        while ((cause = cause.getCause()) != null) {
+            if (cause instanceof InterruptedException) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e1795de..f4b26cb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,8 +61,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
-        //ccConfig.heartbeatPeriod = 5000;
-        //ccConfig.maxHeartbeatLapsePeriods = 1;
+        ccConfig.heartbeatPeriod = 5000;
+        ccConfig.maxHeartbeatLapsePeriods = 8;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7221cb5..b22e468 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -230,8 +230,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index b21cd2a..0ecfd03 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -254,8 +254,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index dd6ee3c..e64e9cc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -245,6 +245,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 3cebfb8..a9c787f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -219,8 +219,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index bbe2764..86a211f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -205,6 +205,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index c4890e1..1f410c7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -100,6 +100,12 @@
 
     @Override
     public void fail() throws HyracksDataException {
-
+        try {
+            bulkLoader.end();
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index bd85e3e..de87909 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -224,8 +224,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..d3a9890 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -161,7 +161,7 @@
                 FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
                     @Override
                     public boolean accept(Path dir) {
-                        return dir.getName().endsWith(TEMP_DIR);
+                        return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
                     }
                 });
                 Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index ca8f190..b44b643 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -30,9 +30,12 @@
 
 public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
+    private final boolean removeIterationState;
 
-    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
+            boolean removeIterationState) {
         super(spec, 1, 1);
+        this.removeIterationState = removeIterationState;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -73,7 +76,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                writer.fail();
             }
 
             @Override
@@ -81,7 +84,9 @@
                 /**
                  * remove last iteration's state
                  */
-                IterationUtils.removeIterationState(ctx, partition);
+                if (removeIterationState) {
+                    IterationUtils.removeIterationState(ctx, partition);
+                }
                 writer.close();
                 complete = true;
             }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index dac087f..5294ace 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -37,7 +37,7 @@
 public class FailureRecoveryTest {
     private static String INPUTPATH = "data/webmap";
     private static String OUTPUTPAH = "actual/result";
-    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
 
     @Test
     public void test() throws Exception {
@@ -57,29 +57,28 @@
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
-            //            Thread thread = new Thread(new Runnable() {
-            //
-            //                @Override
-            //                public void run() {
-            //                    try {
-            //                        synchronized (this) {
-            //                            this.wait(10000);
-            //                            PregelixHyracksIntegrationUtil.showDownNC1();
-            //                        }
-            //                    } catch (Exception e) {
-            //                        throw new IllegalStateException(e);
-            //                    }
-            //                }
-            //
-            //            });
-            //thread.start();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            this.wait(15000);
+                            PregelixHyracksIntegrationUtil.showDownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
             driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
 
             TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
         } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.showDownNC2();
+            testCluster.cleanupHDFS();
             throw e;
-        } finally {
-            testCluster.tearDown();
         }
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+            job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+            job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index 40ea690..660d9eb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -126,7 +126,7 @@
     /**
      * cleanup hdfs cluster
      */
-    private void cleanupHDFS() throws Exception {
+    public void cleanupHDFS() throws Exception {
         dfsCluster.shutdown();
     }