Merge branch 'yingyi/fullstack_fix' of https://code.google.com/p/hyracks into yingyi/fullstack_fix
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
index 1bf4abe..833daf4 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -166,7 +166,7 @@
     }
 
     @Override
-    public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+    public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
         try {
             tb.reset();
             if (parser != null) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 1852a6f..57c20e0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -38,18 +38,13 @@
     public void open(IFrameWriter writer) throws HyracksDataException;
 
     /**
-     * Parse a key-value pair returned by HDFS record reader to a tuple.
-     * when the parsers' internal buffer is full, it can flush the buffer to the writer
-     * 
      * @param key
-     *            The key returned from Hadoop's InputReader.
      * @param value
-     *            The value returned from Hadoop's InputReader.
      * @param writer
-     *            The hyracks writer for outputting data.
+     * @param fileName
      * @throws HyracksDataException
      */
-    public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
+    public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException;
 
     /**
      * Flush the residual tuples in the internal buffer to the writer.
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 674873d..7e6e4dc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -30,6 +30,6 @@
      *            the IHyracksTaskContext
      * @return a tuple writer instance
      */
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 2cff534..a45992c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -128,7 +128,7 @@
                             Object key = reader.createKey();
                             Object value = reader.createValue();
                             while (reader.next(key, value) == true) {
-                                parser.parse(key, value, writer);
+                                parser.parse(key, value, writer, inputSplits[i].toString());
                             }
                         }
                     }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 432849b..4e48e9b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,7 +89,7 @@
                 String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputDirPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
                 try {
                     FileSystem dfs = FileSystem.get(conf);
                     dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index fbac95b..92cde9d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -49,7 +49,8 @@
             }
 
             @Override
-            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+            public void parse(LongWritable key, Text value, IFrameWriter writer, String fileString)
+                    throws HyracksDataException {
                 tb.reset();
                 tb.addField(value.getBytes(), 0, value.getLength());
                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index 92a427e..60be1f7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -27,7 +27,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
         return new ITupleWriter() {
             private byte newLine = "\n".getBytes()[0];
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 3f01d77..43ca4ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -144,7 +144,8 @@
                             RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                             reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
-                                parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+                                parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
+                                        inputSplits.get(i).toString());
                             }
                         }
                     }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 77b8c7e..068cdfc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -88,7 +88,7 @@
                 String outputPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
index 503b521..22d3b27 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
@@ -36,7 +37,7 @@
 public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
         extends VertexInputFormat<I, V, E, M> {
     /** Uses the SequenceFileInputFormat to do everything */
-    protected SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+    private SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
 
     @SuppressWarnings("unchecked")
     @Override
@@ -45,28 +46,31 @@
     }
 
     @Override
-    public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+    public VertexReader<I, V, E, M> createVertexReader(final InputSplit split, final TaskAttemptContext context)
+            throws IOException {
         return new VertexReader<I, V, E, M>() {
+            RecordReader recordReader = sequenceInputFormat.createRecordReader(split, context);
 
             @Override
             public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
                     InterruptedException {
-
+                recordReader.initialize(inputSplit, context);
             }
 
             @Override
             public boolean nextVertex() throws IOException, InterruptedException {
-                return false;
+                return recordReader.nextKeyValue();
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
-                return null;
+                return (Vertex<I, V, E, M>) recordReader.getCurrentValue();
             }
 
             @Override
             public void close() throws IOException {
-
+                recordReader.close();
             }
 
             @Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
index 0581658..b603037 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -16,11 +16,14 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -32,25 +35,29 @@
 @SuppressWarnings("rawtypes")
 public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
         VertexOutputFormat<I, V, E> {
+    private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
 
     @Override
-    public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+    public VertexWriter<I, V, E> createVertexWriter(final TaskAttemptContext context) throws IOException,
             InterruptedException {
         return new VertexWriter<I, V, E>() {
+            private RecordWriter recordWriter = sequenceOutputFormat.getRecordWriter(context);
+            private NullWritable key = NullWritable.get();
 
             @Override
             public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
 
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
-
+                recordWriter.write(key, vertex);
             }
 
             @Override
             public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-
+                recordWriter.close(context);
             }
 
         };
@@ -68,19 +75,19 @@
             @Override
             public void abortTask(TaskAttemptContext arg0) throws IOException {
                 // TODO Auto-generated method stub
-                
+
             }
 
             @Override
             public void cleanupJob(JobContext arg0) throws IOException {
                 // TODO Auto-generated method stub
-                
+
             }
 
             @Override
             public void commitTask(TaskAttemptContext arg0) throws IOException {
                 // TODO Auto-generated method stub
-                
+
             }
 
             @Override
@@ -90,12 +97,12 @@
 
             @Override
             public void setupJob(JobContext arg0) throws IOException {
-                
+
             }
 
             @Override
             public void setupTask(TaskAttemptContext arg0) throws IOException {
-                
+
             }
 
         };
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
new file mode 100644
index 0000000..9d6eb5a
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.job;
+
+/**
+ * @author yingyib
+ */
+public interface ICheckpointHook {
+
+    public boolean checkpoint(int superstep);
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index d2d5cf0..1e0d87a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -74,6 +74,18 @@
     public static final String FRAME_SIZE = "pregelix.framesize";
     /** update intensive */
     public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
+    /** the check point hook */
+    public static final String CKP_CLASS = "pregelix.checkpointHook";
+
+    /**
+     * Construct a Pregelix job from an existing configuration
+     * 
+     * @param conf
+     * @throws IOException
+     */
+    public PregelixJob(Configuration conf) throws IOException {
+        super(conf);
+    }
 
     /**
      * Constructor that will instantiate the configuration
@@ -202,6 +214,15 @@
         getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
     }
 
+    /**
+     * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+     * 
+     * @param ckpClass
+     */
+    final public void setCheckpointHook(Class<?> ckpClass) {
+        getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
+    }
+
     @Override
     public String toString() {
         return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index ff4ee91..51a9ce3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -461,6 +462,24 @@
     }
 
     /**
+     * Create a checkpoint hook
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static ICheckpointHook createCheckpointHook(Configuration conf) {
+        Class<? extends ICheckpointHook> ckpClass = getCheckpointHookClass(conf);
+        try {
+            return ckpClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+        }
+    }
+
+    /**
      * Get the user's subclassed vertex partitioner class.
      * 
      * @param conf
@@ -475,6 +494,20 @@
     }
 
     /**
+     * Get the user's subclassed checkpoint hook class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return The user defined vertex checkpoint hook class
+     */
+    @SuppressWarnings("unchecked")
+    public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
+    }
+
+    /**
      * Get the job configuration parameter whether the vertex states will increase dynamically
      * 
      * @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
new file mode 100644
index 0000000..6a4a660
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+
+/**
+ * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ * 
+ * @author yingyib
+ */
+public class ConservativeCheckpointHook implements ICheckpointHook {
+
+    @Override
+    public boolean checkpoint(int superstep) {
+        if (superstep % 5 == 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
new file mode 100644
index 0000000..c37c4ab
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+
+/**
+ * The default checkpoint hook which never does checkpointing.
+ * 
+ * @author yingyib
+ */
+public class DefaultCheckpointHook implements ICheckpointHook {
+
+    @Override
+    public boolean checkpoint(int superstep) {
+        return false;
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
index f9cad9f..c7febc1 100755
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
 package edu.uci.ics.pregelix.api.util;
 
 import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 public class ResetableByteArrayInputStream extends InputStream {
-    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
 
     private byte[] data;
     private int position;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
index 2d58902..6bb0dea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -26,4 +26,10 @@
 
     public JobSpecification generateJob(int iteration) throws HyracksException;
 
+    public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException;
+
+    public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException;
+
+    public JobSpecification generateClearState() throws HyracksException;
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 8b82bf2..82f94b9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -39,6 +40,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
@@ -93,25 +95,29 @@
             DeploymentId deploymentId = prepareJobs(ipAddress, port);
             LOG.info("job started");
 
-            int lastSnapshotJobIndex = 0;
-            int lastSnapshotSuperstep = 0;
+            IntWritable lastSnapshotJobIndex = new IntWritable(0);
+            IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 3;
+            int maxRetryCount = 1;
 
             do {
                 try {
-                    for (int i = lastSnapshotJobIndex; i < jobs.size(); i++) {
+                    for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
                         lastJob = currentJob;
                         currentJob = jobs.get(i);
 
                         /** add hadoop configurations */
                         addHadoopConfiguration(currentJob, ipAddress, port);
+                        ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
 
                         /** load the data */
                         if (i == 0 || compatible(lastJob, currentJob)) {
                             if (i != 0) {
                                 finishJobs(jobGen, deploymentId);
+                                /** invalidate/clear checkpoint */
+                                lastSnapshotJobIndex.set(0);
+                                lastSnapshotSuperstep.set(0);
                             }
                             jobGen = selectJobGen(planChoice, currentJob);
                             loadData(currentJob, jobGen, deploymentId);
@@ -120,12 +126,15 @@
                         }
 
                         /** run loop-body jobs */
-                        runLoopBody(deploymentId, currentJob, jobGen, lastSnapshotSuperstep);
+                        runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
+                                ckpHook);
                         runClearState(deploymentId, jobGen);
                     }
 
                     /** finish the jobs */
                     finishJobs(jobGen, deploymentId);
+                    /** clear checkpoints if any */
+                    jobGen.clearCheckpoints();
                     hcc.unDeployBinary(deploymentId);
                 } catch (IOException ioe) {
                     /** disk failures */
@@ -242,12 +251,13 @@
         ClusterConfig.loadClusterConfig(ipAddress, port);
     }
 
-    private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int snapshotSuperstep)
-            throws Exception {
-        if (snapshotSuperstep > 0) {
-            /** reload the snapshot */
+    private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
+            IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook) throws Exception {
+        if (snapshotJobIndex.get() >= 0 && snapshotSuperstep.get() > 0) {
+            /** reload the checkpoint */
+            runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
         }
-        int i = snapshotSuperstep + 1;
+        int i = snapshotSuperstep.get() + 1;
         boolean terminate = false;
         long start, end, time;
         do {
@@ -258,10 +268,34 @@
             LOG.info(job + ": iteration " + i + " finished " + time + "ms");
             terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
                     || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
+            if (ckpHook.checkpoint(i)) {
+                runCheckpoint(deploymentId, jobGen, i);
+                snapshotSuperstep.set(i);
+                snapshotJobIndex.set(currentJobIndex);
+            }
             i++;
         } while (!terminate);
     }
 
+    private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen, int lastSuccessfulIteration) throws Exception {
+        try {
+            JobSpecification[] ckpJobs = jobGen.generateCheckpointing(lastSuccessfulIteration);
+            runJobArray(deploymentId, ckpJobs);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen, int checkPointedIteration)
+            throws Exception {
+        try {
+            JobSpecification[] ckpJobs = jobGen.generateLoadingCheckpoint(checkPointedIteration);
+            runJobArray(deploymentId, ckpJobs);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
     private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 931ecc3..8c36ea5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -15,8 +15,14 @@
 
 package edu.uci.ics.pregelix.core.jobgen;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
@@ -24,10 +30,15 @@
 import java.util.logging.Logger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -50,6 +61,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -67,9 +82,12 @@
 import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.ReflectionUtils;
@@ -80,7 +98,13 @@
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -111,12 +135,19 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
+    private String vertexCheckpointPath;
+
     public JobGen(PregelixJob job) {
+        init(job);
+    }
+
+    private void init(PregelixJob job) {
         conf = job.getConfiguration();
         pregelixJob = job;
         initJobConfiguration();
         job.setJobId(jobId);
 
+        vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
         // set the frame size to be the one user specified if the user did
         // specify.
         int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
@@ -130,10 +161,7 @@
     }
 
     public void reset(PregelixJob job) {
-        conf = job.getConfiguration();
-        pregelixJob = job;
-        initJobConfiguration();
-        job.setJobId(jobId);
+        init(job);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -195,74 +223,6 @@
         return spec;
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public JobSpecification generateLoadingJob() throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-
-        /**
-         * the graph file scan operator and use count constraint first, will use
-         * absolute constraint later
-         */
-        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        try {
-            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
-            LOGGER.info("number of splits: " + splits.size());
-            for (InputSplit split : splits)
-                LOGGER.info(split.toString());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
-        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
-                readSchedule, confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner);
-
-        /**
-         * construct sort operator
-         */
-        int[] sortFields = new int[1];
-        sortFields[0] = 0;
-        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
-                nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
-
-        /**
-         * construct tree bulk load operator
-         */
-        int[] fieldPermutation = new int[2];
-        fieldPermutation[0] = 0;
-        fieldPermutation[1] = 1;
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
-                getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
-
-        /**
-         * connect operator descriptors
-         */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
     @Override
     public JobSpecification generateJob(int iteration) throws HyracksException {
         if (iteration <= 0)
@@ -398,16 +358,193 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+        //ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), scanner, 0, writer, 0);
+        //spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+        //        comparatorFactories), scanner, 0, writer, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
+
+    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+        JobSpecification spec = scanIndexWriteToHDFS(conf);
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+        try {
+            PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+            tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+            tmpJob.setOutputKeyClass(NullWritable.class);
+            tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+            JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+            JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
+            JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+            specs[0] = vertexCkpSpec;
+            for (int i = 1; i < specs.length; i++) {
+                specs[i] = stateCkpSpecs[i - 1];
+            }
+            return specs;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    @Override
+    public JobSpecification generateLoadingJob() throws HyracksException {
+        JobSpecification spec = loadHDFSData(conf);
+        return spec;
+    }
+
+    public void clearCheckpoints() throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        // clear the checkpoint directory
+        dfs.delete(new Path("/tmp/ckpoint/" + jobId), true);
+    }
+
+    @Override
+    public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException {
+        try {
+            PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
+            tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+            JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+            JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, pregelixJob);
+            JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
+            specs[0] = vertexLoadSpec;
+            for (int i = 1; i < specs.length; i++) {
+                specs[i] = stateLoadSpecs[i - 1];
+            }
+            return specs;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    /***
+     * generate a "clear state" job
+     */
+    public JobSpecification generateClearState() throws HyracksException {
+        JobSpecification spec = new JobSpecification();
+        ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
+        ClusterConfig.setLocationConstraint(spec, clearState);
+        spec.addRoot(clearState);
+        return spec;
+    }
+
+    /***
+     * drop the sindex
+     * 
+     * @return JobSpecification
+     * @throws HyracksException
+     */
+    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+                lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
+
+        ClusterConfig.setLocationConstraint(spec, drop);
+        spec.addRoot(drop);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+        if (partitionerClazz != null) {
+            return new VertexPartitionComputerFactory(confFactory);
+        } else {
+            return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+                    BspUtils.getVertexIndexClass(conf)));
+        }
+    }
+
+    protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
+        if (BspUtils.useLSM(conf)) {
+            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
+                    3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+                    NoOpIOOperationCallback.INSTANCE, 0.01);
+        } else {
+            return new BTreeDataflowHelperFactory();
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
+
+        /**
+         * construct tree bulk load operator
+         */
+        int[] fieldPermutation = new int[2];
+        fieldPermutation[0] = 0;
+        fieldPermutation[1] = 1;
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
+                getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+    private JobSpecification scanIndexWriteToHDFS(Configuration conf) throws HyracksDataException, HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
@@ -464,56 +601,115 @@
         return spec;
     }
 
-    /***
-     * generate a "clear state" job
-     */
-    public JobSpecification generateClearState() throws HyracksException {
-        JobSpecification spec = new JobSpecification();
-        ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
-        ClusterConfig.setLocationConstraint(spec, clearState);
-        spec.addRoot(clearState);
-        return spec;
+    protected PregelixJob createCloneJob(String newJobName, PregelixJob oldJob) throws HyracksException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            oldJob.getConfiguration().write(dos);
+            PregelixJob newJob = new PregelixJob(newJobName);
+            DataInput dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+            newJob.getConfiguration().readFields(dis);
+            return newJob;
+        } catch (IOException e) {
+            throw new HyracksException(e);
+        }
     }
 
-    /***
-     * drop the sindex
-     * 
-     * @return JobSpecification
-     * @throws HyracksException
-     */
-    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+    /** generate plan specific state checkpointing */
+    protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
-        IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
-                lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
+        /**
+         * source aggregate
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
 
-        ClusterConfig.setLocationConstraint(spec, drop);
-        spec.addRoot(drop);
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
+        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+        FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+        tmpJob.setOutputKeyClass(vertexIdClass);
+        tmpJob.setOutputValueClass(MsgList.class);
+
+        ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
+        HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
         spec.setFrameSize(frameSize);
-        return spec;
+        return new JobSpecification[] { spec };
     }
 
+    /** load plan specific state checkpoints */
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
-        if (partitionerClazz != null) {
-            return new VertexPartitionComputerFactory(confFactory);
-        } else {
-            return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
-                    BspUtils.getVertexIndexClass(conf)));
+    protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
+            throws HyracksException {
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+        tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+        try {
+            FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+        } catch (IOException e) {
+            throw new HyracksException(e);
         }
-    }
+        Configuration conf = job.getConfiguration();
+        Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        JobSpecification spec = new JobSpecification();
 
-    protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
-        if (BspUtils.useLSM(conf)) {
-            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
-                    3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
-                    NoOpIOOperationCallback.INSTANCE, 0.01);
-        } else {
-            return new BTreeDataflowHelperFactory();
+        /***
+         * HDFS read operator
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), MsgList.class.getName());
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+                readSchedule, new KeyValueParserFactory());
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
+                recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+                materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+        spec.setFrameSize(frameSize);
+        return new JobSpecification[] { spec };
     }
 
     /** generate non-first iteration job */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index c144ddd..58384b2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -15,28 +15,51 @@
 
 package edu.uci.ics.pregelix.core.jobgen;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -48,6 +71,8 @@
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -69,6 +94,7 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 
 public class JobGenInnerJoin extends JobGen {
+    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
 
     public JobGenInnerJoin(PregelixJob job) {
         super(job);
@@ -496,6 +522,171 @@
         return spec;
     }
 
+    /** generate plan specific state checkpointing */
+    protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+        JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
+        PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+        /** generate secondary index checkpoint */
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+        tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
+        tmpJob.setOutputValueClass(MsgList.class);
+        JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
+
+        JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+        for (int i = 0; i < msgCkpSpecs.length; i++) {
+            specs[i] = msgCkpSpecs[i];
+        }
+        specs[specs.length - 1] = secondaryBTreeCkp;
+        return specs;
+    }
+
+    /**
+     * generate plan specific checkpoint loading
+     */
+    @Override
+    protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
+            throws HyracksException {
+        JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
+        PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+        tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+        /** generate secondary index checkpoint load */
+        JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
+        JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+        for (int i = 0; i < msgCkpSpecs.length; i++) {
+            specs[i] = msgCkpSpecs[i];
+        }
+        specs[specs.length - 1] = secondaryBTreeCkpLoad;
+        return specs;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
+            throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+        JobSpecification spec = new JobSpecification();
+
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+        tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+        try {
+            FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+        } catch (IOException e) {
+            throw new HyracksException(e);
+        }
+        Configuration conf = job.getConfiguration();
+
+        /***
+         * construct HDFS read operator
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), MsgList.class.getName());
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+                readSchedule, new KeyValueParserFactory());
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct bulk-load index operator
+         */
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        int fieldPermutation[] = new int[] { 0, 1 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
+                .get(vertexIdClass).getClass());
+        String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
+                indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
+                getIndexDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+                btreeBulkLoad, 0);
+        spec.setFrameSize(frameSize);
+
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
+            throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+        Class<? extends Writable> msgListClass = MsgList.class;
+        String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+        JobSpecification spec = new JobSpecification();
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search operator
+         */
+        ConfFactory confFactory = new ConfFactory(job);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), msgListClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
+                comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct write file operator
+         */
+        HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
+                confFactory));
+        ClusterConfig.setLocationConstraint(spec, writer);
+
+        /**
+         * connect operator descriptors
+         */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
+
     @Override
     public JobSpecification[] generateCleanup() throws HyracksException {
         JobSpecification[] cleanups = new JobSpecification[3];
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index db18e53..ea6cc8a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -228,4 +228,16 @@
     public static Scheduler getHdfsScheduler() {
         return hdfsScheduler;
     }
+
+    public static String[] getLocationConstraint() throws HyracksException {
+        int count = 0;
+        String[] locations = new String[NCs.length * stores.length];
+        for (String nc : NCs) {
+            for (int i = 0; i < stores.length; i++) {
+                locations[count] = nc;
+                count++;
+            }
+        }
+        return locations;
+    }
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
new file mode 100644
index 0000000..a4a53e1
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * @author yingyib
+ */
+public class KeyValueParserFactory<K extends Writable, V extends Writable> implements IKeyValueParserFactory<K, V> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+        final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        final DataOutput dos = tb.getDataOutput();
+        final ByteBuffer buffer = ctx.allocateFrame();
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(buffer, true);
+
+        return new IKeyValueParser<K, V>() {
+
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
+                try {
+                    tb.reset();
+                    key.write(dos);
+                    tb.addFieldEndOffset();
+                    value.write(dos);
+                    tb.addFieldEndOffset();
+                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        FrameUtils.flushFrame(buffer, writer);
+                        appender.reset(buffer, true);
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            throw new HyracksDataException("tuple cannot be appended into the frame");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(buffer, writer);
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
new file mode 100644
index 0000000..fd407be
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class KeyValueWriterFactory implements ITupleWriterFactory {
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+
+    public KeyValueWriterFactory(ConfFactory confFactory) {
+        this.confFactory = confFactory;
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, final int partition, final int nPartition)
+            throws HyracksDataException {
+        return new ITupleWriter() {
+            private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
+            private Writable key;
+            private Writable value;
+            private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+            private DataInput dis = new DataInputStream(bis);
+            private RecordWriter recordWriter;
+            private ContextFactory ctxFactory = new ContextFactory();
+            private TaskAttemptContext context;
+
+            @Override
+            public void open(DataOutput output) throws HyracksDataException {
+                try {
+                    Job job = confFactory.getConf();
+                    context = ctxFactory.createContext(job.getConfiguration(), partition);
+                    recordWriter = sequenceOutputFormat.getRecordWriter(context);
+                    Class<?> keyClass = context.getOutputKeyClass();
+                    Class<?> valClass = context.getOutputValueClass();
+                    key = (Writable) ReflectionUtils.createInstance(keyClass);
+                    value = (Writable) ReflectionUtils.createInstance(valClass);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                try {
+                    byte[] data = tuple.getFieldData(0);
+                    int fieldStart = tuple.getFieldStart(0);
+                    bis.setByteArray(data, fieldStart);
+                    key.readFields(dis);
+                    data = tuple.getFieldData(1);
+                    fieldStart = tuple.getFieldStart(1);
+                    bis.setByteArray(data, fieldStart);
+                    value.readFields(dis);
+                    recordWriter.write(key, value);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close(DataOutput output) throws HyracksDataException {
+                try {
+                    recordWriter.close(context);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 48ed806..00dcbd1 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -68,6 +68,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    /** remove last iteration's state */
+                    IterationUtils.removeIterationState(ctx, partition);
                     state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
                     INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 75f8ed8..603a464 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -52,6 +52,11 @@
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
         Map<StateKey, IStateObject> map = context.getAppStateStore();
         IStateObject state = map.get(new StateKey(lastId, partition));
+        while (state == null) {
+            /** in case the last job is a checkpointing job */
+            lastId = new JobId(lastId.getId() - 1);
+            state = map.get(new StateKey(lastId, partition));
+        }
         return state;
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index cb7fd6d..f6857fe 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
@@ -76,6 +77,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setCheckpointHook(ConservativeCheckpointHook.class);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -90,6 +92,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+        job.setCheckpointHook(ConservativeCheckpointHook.class);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -116,6 +119,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setCheckpointHook(ConservativeCheckpointHook.class);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
index aa0dfdd..5a2636a 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
@@ -37,8 +37,8 @@
  */
 public class FailureVertexTest {
 
-    private static String HDFS_INPUTPATH2 = "data/webmapcomplex";
-    private static String HDFS_OUTPUTPAH2 = "actual/resultcomplex";
+    private static String INPUT_PATH = "data/webmapcomplex";
+    private static String OUTPUT_PATH = "actual/resultcomplex";
 
     @Test
     public void test() throws Exception {
@@ -52,8 +52,8 @@
             job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             job.setDynamicVertexValueSize(true);
 
-            FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
-            FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+            FileInputFormat.setInputPaths(job, INPUT_PATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
             job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
 
             Driver driver = new Driver(FailureVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
index 0d6f863..d2995f1 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
@@ -24,6 +24,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex;
@@ -55,6 +56,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -64,6 +66,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 9e1e0b0..b50b02a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -84,6 +84,7 @@
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
 <property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
 <property><name>mapred.job.tracker</name><value>local</value></property>
 <property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index c4366d7..217fbba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -84,6 +84,7 @@
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
 <property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
 <property><name>mapred.job.tracker</name><value>local</value></property>
 <property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index ac0d508..636b055 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -84,6 +84,7 @@
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
 <property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
 <property><name>mapred.job.tracker</name><value>local</value></property>
 <property><name>io.skip.checksum.errors</name><value>false</value></property>