1. fix the node failure scenario in job scheduler;  2. add fault-tolerance support and tests in pregelix
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 30e16b5..be419c2 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -560,7 +560,7 @@
     public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
         return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
     }
-    
+
     /**
      * Get the path for message checkpointing
      * 
@@ -569,6 +569,18 @@
      * @return the path for message checkpointing
      */
     public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
-        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+        String path = "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+        return path;
+    }
+
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
     }
 }
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3a0f20d..61c3653 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
@@ -99,7 +100,8 @@
             IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 1;
+            int maxRetryCount = 3;
+            jobGen = selectJobGen(planChoice, currentJob);
 
             do {
                 try {
@@ -112,14 +114,14 @@
                         ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
 
                         /** load the data */
-                        if (i == 0 || compatible(lastJob, currentJob)) {
+                        if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
                             if (i != 0) {
                                 finishJobs(jobGen, deploymentId);
                                 /** invalidate/clear checkpoint */
                                 lastSnapshotJobIndex.set(0);
                                 lastSnapshotSuperstep.set(0);
                             }
-                            jobGen = selectJobGen(planChoice, currentJob);
+                            jobGen.reset(currentJob);
                             loadData(currentJob, jobGen, deploymentId);
                         } else {
                             jobGen.reset(currentJob);
@@ -137,12 +139,16 @@
                     /** clear checkpoints if any */
                     jobGen.clearCheckpoints();
                     hcc.unDeployBinary(deploymentId);
-                } catch (IOException ioe) {
+                } catch (Exception e1) {
                     /** disk failures */
                     //restart from snapshot
-                    failed = true;
-                    retryCount++;
-                    throw new HyracksException(ioe);
+                    /** node failures */
+                    if (ExceptionUtilities.recoverable(e1)) {
+                        failed = true;
+                        retryCount++;
+                    } else {
+                        throw e1;
+                    }
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
@@ -256,8 +262,13 @@
             throws Exception {
         if (doRecovery) {
             /** reload the checkpoint */
-            runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
-
+            if (snapshotSuperstep.get() > 0) {
+                runClearState(deploymentId, jobGen);
+                runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+            } else {
+                runClearState(deploymentId, jobGen);
+                loadData(job, jobGen, deploymentId);
+            }
         }
         int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
         boolean terminate = false;
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 13fd8be..91d7e22 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -380,9 +380,11 @@
             FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
             dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+
             dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
             JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+
             specs[0] = vertexCkpSpec;
             for (int i = 1; i < specs.length; i++) {
                 specs[i] = stateCkpSpecs[i - 1];
@@ -395,7 +397,7 @@
 
     @Override
     public JobSpecification generateLoadingJob() throws HyracksException {
-        JobSpecification spec = loadHDFSData(conf);
+        JobSpecification spec = loadHDFSData(pregelixJob);
         return spec;
     }
 
@@ -412,14 +414,20 @@
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
             FileInputFormat.setInputPaths(tmpJob,
                     new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
-            FileSystem dfs = FileSystem.get(conf);
-            dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)), true);
-            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+            JobSpecification[] cleanVertices = generateCleanup();
+            JobSpecification createIndex = generateCreatingJob();
+            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob);
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
-            JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
-            specs[0] = vertexLoadSpec;
-            for (int i = 1; i < specs.length; i++) {
-                specs[i] = stateLoadSpecs[i - 1];
+            JobSpecification[] specs = new JobSpecification[cleanVertices.length + 2 + stateLoadSpecs.length];
+
+            int i = 0;
+            for (; i < cleanVertices.length; i++) {
+                specs[i] = cleanVertices[i];
+            }
+            specs[i++] = createIndex;
+            specs[i++] = vertexLoadSpec;
+            for (; i < specs.length; i++) {
+                specs[i] = stateLoadSpecs[i - cleanVertices.length - 2];
             }
             return specs;
         } catch (Exception e) {
@@ -480,7 +488,8 @@
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+    private JobSpecification loadHDFSData(PregelixJob job) throws HyracksException, HyracksDataException {
+        Configuration conf = job.getConfiguration();
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
@@ -493,7 +502,7 @@
         VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+            splits = inputFormat.getSplits(job, fileSplitProvider.getFileSplits().length);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -638,7 +647,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                false);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
@@ -661,7 +671,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastCheckpointedIteration);
+        String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
@@ -669,7 +679,7 @@
         } catch (IOException e) {
             throw new HyracksException(e);
         }
-        Configuration conf = job.getConfiguration();
+        Configuration conf = tmpJob.getConfiguration();
         Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
 
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 41887c0..7c1df0f 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -344,7 +344,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -529,7 +530,7 @@
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
 
         /** generate secondary index checkpoint */
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
         tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
         tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +570,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         JobSpecification spec = new JobSpecification();
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
+        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index c29ea18..287b797 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -299,7 +299,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index dc61971..3b3c9e7 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -288,7 +288,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 34f723f..e334095 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -302,7 +302,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+                true);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
new file mode 100644
index 0000000..4b1bf94
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+/**
+ * The util to analysis exceptions
+ * 
+ * @author yingyib
+ */
+public class ExceptionUtilities {
+
+    /**
+     * Check whether a exception is recoverable or not
+     * 
+     * @param exception
+     * @return true or false
+     */
+    public static boolean recoverable(Exception exception) {
+        String message = exception.getMessage();
+        if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
+                || message.contains("Failure occurred on input")) {
+            return true;
+        }
+        Throwable cause = exception;
+        while ((cause = cause.getCause()) != null) {
+            if (cause instanceof InterruptedException) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e1795de..f4b26cb 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,8 +61,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
-        //ccConfig.heartbeatPeriod = 5000;
-        //ccConfig.maxHeartbeatLapsePeriods = 1;
+        ccConfig.heartbeatPeriod = 5000;
+        ccConfig.maxHeartbeatLapsePeriods = 8;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7221cb5..b22e468 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -230,8 +230,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index b21cd2a..0ecfd03 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -254,8 +254,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index dd6ee3c..e64e9cc 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -245,6 +245,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 3cebfb8..a9c787f 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -219,8 +219,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     /** compare tuples */
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index bbe2764..86a211f 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -205,6 +205,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
         writer.fail();
     }
 
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index c4890e1..1f410c7 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -100,6 +100,12 @@
 
     @Override
     public void fail() throws HyracksDataException {
-
+        try {
+            bulkLoader.end();
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.close();
+        }
     }
 }
\ No newline at end of file
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index bd85e3e..de87909 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -224,8 +224,16 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        for (IFrameWriter writer : writers)
+        try {
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexHelper.close();
+        }
+        for (IFrameWriter writer : writers) {
             writer.fail();
+        }
     }
 
     @Override
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index ca8f190..b44b643 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -30,9 +30,12 @@
 
 public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
+    private final boolean removeIterationState;
 
-    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
+            boolean removeIterationState) {
         super(spec, 1, 1);
+        this.removeIterationState = removeIterationState;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -73,7 +76,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                writer.fail();
             }
 
             @Override
@@ -81,7 +84,9 @@
                 /**
                  * remove last iteration's state
                  */
-                IterationUtils.removeIterationState(ctx, partition);
+                if (removeIterationState) {
+                    IterationUtils.removeIterationState(ctx, partition);
+                }
                 writer.close();
                 complete = true;
             }
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index dac087f..5294ace 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -37,7 +37,7 @@
 public class FailureRecoveryTest {
     private static String INPUTPATH = "data/webmap";
     private static String OUTPUTPAH = "actual/result";
-    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
 
     @Test
     public void test() throws Exception {
@@ -57,29 +57,28 @@
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
-            //            Thread thread = new Thread(new Runnable() {
-            //
-            //                @Override
-            //                public void run() {
-            //                    try {
-            //                        synchronized (this) {
-            //                            this.wait(10000);
-            //                            PregelixHyracksIntegrationUtil.showDownNC1();
-            //                        }
-            //                    } catch (Exception e) {
-            //                        throw new IllegalStateException(e);
-            //                    }
-            //                }
-            //
-            //            });
-            //thread.start();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        synchronized (this) {
+                            this.wait(15000);
+                            PregelixHyracksIntegrationUtil.showDownNC1();
+                        }
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
             driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
 
             TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
         } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.showDownNC2();
+            testCluster.cleanupHDFS();
             throw e;
-        } finally {
-            testCluster.tearDown();
         }
     }
 
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index 40ea690..660d9eb 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -126,7 +126,7 @@
     /**
      * cleanup hdfs cluster
      */
-    private void cleanupHDFS() throws Exception {
+    public void cleanupHDFS() throws Exception {
         dfsCluster.shutdown();
     }