fix the checkpoint writing for vertex checkpointing and message checkpointing
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..30e16b5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,26 @@
     public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
         return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
     }
+
+    /**
+     * Get the path for vertex checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for vertex checkpointing
+     */
+    public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+    }
+    
+    /**
+     * Get the path for message checkpointing
+     * 
+     * @param conf
+     * @param lastSuperStep
+     * @return the path for message checkpointing
+     */
+    public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+        return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..3a0f20d7e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -272,8 +272,8 @@
                     || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
             if (ckpHook.checkpoint(i)) {
                 runCheckpoint(deploymentId, jobGen, i);
-                snapshotSuperstep.set(i);
                 snapshotJobIndex.set(currentJobIndex);
+                snapshotSuperstep.set(i);
             }
             i++;
         } while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..13fd8be 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -137,8 +137,6 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    private String vertexCheckpointPath;
-
     public JobGen(PregelixJob job) {
         init(job);
     }
@@ -148,10 +146,7 @@
         pregelixJob = job;
         initJobConfiguration();
         job.setJobId(jobId);
-
-        vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
-        // set the frame size to be the one user specified if the user did
-        // specify.
+        // set the frame size to be the one user specified if the user did specify.
         int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
         if (specifiedFrameSize > 0) {
             frameSize = specifiedFrameSize;
@@ -376,13 +371,16 @@
     @Override
     public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         try {
-
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
             tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
-            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+            FileOutputFormat.setOutputPath(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
             tmpJob.setOutputKeyClass(NullWritable.class);
             tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+            FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+            dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+            dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
             JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
             JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
             specs[0] = vertexCkpSpec;
@@ -412,7 +410,10 @@
         try {
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
-            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
+            FileInputFormat.setInputPaths(tmpJob,
+                    new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+            FileSystem dfs = FileSystem.get(conf);
+            dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)), true);
             JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
             JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -640,7 +641,7 @@
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -660,7 +661,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+        String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastCheckpointedIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..d3a9890 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -161,7 +161,7 @@
                 FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
                     @Override
                     public boolean accept(Path dir) {
-                        return dir.getName().endsWith(TEMP_DIR);
+                        return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
                     }
                 });
                 Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+            job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+            job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);