avoid btree update code path when the target entry has sufficient space
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
new file mode 100644
index 0000000..503b521
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
+        extends VertexInputFormat<I, V, E, M> {
+    /** Uses the SequenceFileInputFormat to do everything */
+    protected SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        return sequenceInputFormat.getSplits(context);
+    }
+
+    @Override
+    public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+        return new VertexReader<I, V, E, M>() {
+
+            @Override
+            public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+                    InterruptedException {
+
+            }
+
+            @Override
+            public boolean nextVertex() throws IOException, InterruptedException {
+                return false;
+            }
+
+            @Override
+            public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+                return null;
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public float getProgress() throws IOException, InterruptedException {
+                return 0;
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
new file mode 100644
index 0000000..0581658
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
+        VertexOutputFormat<I, V, E> {
+
+    @Override
+    public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        return new VertexWriter<I, V, E>() {
+
+            @Override
+            public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
+
+            }
+
+            @Override
+            public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+
+            }
+
+            @Override
+            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+            }
+
+        };
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new OutputCommitter() {
+
+            @Override
+            public void abortTask(TaskAttemptContext arg0) throws IOException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void cleanupJob(JobContext arg0) throws IOException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void commitTask(TaskAttemptContext arg0) throws IOException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+                return false;
+            }
+
+            @Override
+            public void setupJob(JobContext arg0) throws IOException {
+                
+            }
+
+            @Override
+            public void setupTask(TaskAttemptContext arg0) throws IOException {
+                
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index dae7818..d2d5cf0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -201,4 +201,9 @@
     final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
         getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
     }
+
+    @Override
+    public String toString() {
+        return getJobName();
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5c8cb33..1b4f226 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -93,32 +93,48 @@
             DeploymentId deploymentId = prepareJobs(ipAddress, port);
             LOG.info("job started");
 
-            for (int i = 0; i < jobs.size(); i++) {
-                lastJob = currentJob;
-                currentJob = jobs.get(i);
+            int lastSnapshotJobIndex = 0;
+            int lastSnapshotSuperstep = 0;
+            boolean failed = false;
+            int retryCount = 0;
+            int maxRetryCount = 3;
 
-                /** add hadoop configurations */
-                addHadoopConfiguration(currentJob, ipAddress, port);
+            do {
+                try {
+                    for (int i = lastSnapshotJobIndex; i < jobs.size(); i++) {
+                        lastJob = currentJob;
+                        currentJob = jobs.get(i);
 
-                /** load the data */
-                if (i == 0 || compatible(lastJob, currentJob)) {
-                    if (i != 0) {
-                        finishJobs(jobGen, deploymentId);
+                        /** add hadoop configurations */
+                        addHadoopConfiguration(currentJob, ipAddress, port);
+
+                        /** load the data */
+                        if (i == 0 || compatible(lastJob, currentJob)) {
+                            if (i != 0) {
+                                finishJobs(jobGen, deploymentId);
+                            }
+                            jobGen = selectJobGen(planChoice, currentJob);
+                            loadData(currentJob, jobGen, deploymentId);
+                        } else {
+                            jobGen.reset(currentJob);
+                        }
+
+                        /** run loop-body jobs */
+                        runLoopBody(deploymentId, currentJob, jobGen, lastSnapshotSuperstep);
+                        runClearState(deploymentId, jobGen);
                     }
-                    jobGen = selectJobGen(planChoice, currentJob);
-                    loadData(currentJob, jobGen, deploymentId);
-                } else {
-                    jobGen.reset(currentJob);
+
+                    /** finish the jobs */
+                    finishJobs(jobGen, deploymentId);
+                    hcc.unDeployBinary(deploymentId);
+                } catch (IOException ioe) {
+                    /** disk failures */
+                    //restart from snapshot
+                    failed = true;
+                    retryCount++;
+                    ioe.printStackTrace();
                 }
-
-                /** run loop-body jobs */
-                runLoopBody(deploymentId, currentJob, jobGen);
-                runClearState(deploymentId, jobGen);
-            }
-
-            /** finish the jobs */
-            finishJobs(jobGen, deploymentId);
-            hcc.unDeployBinary(deploymentId);
+            } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -225,8 +241,12 @@
         ClusterConfig.loadClusterConfig(ipAddress, port);
     }
 
-    private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen) throws Exception {
-        int i = 1;
+    private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int snapshotSuperstep)
+            throws Exception {
+        if (snapshotSuperstep > 0) {
+            /** reload the snapshot */
+        }
+        int i = snapshotSuperstep + 1;
         boolean terminate = false;
         long start, end, time;
         do {
@@ -234,7 +254,7 @@
             runLoopBodyIteration(deploymentId, jobGen, i);
             end = System.currentTimeMillis();
             time = end - start;
-            LOG.info("iteration " + i + " finished " + time + "ms");
+            LOG.info(job + ": iteration " + i + " finished " + time + "ms");
             terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
                     || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
             i++;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 8e731ba..392f728 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -33,8 +33,8 @@
             int srcStart = fieldEndOffsets[0];
             int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
             int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
-            if (srcLen == frSize) {
-                //doing in-place update if the vertex size is fixed, save the "real update" overhead
+            if (srcLen <= frSize) {
+                //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
                 System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
                         frameTuple.getFieldStart(1), srcLen);
                 cloneUpdateTb.reset();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 85f3959..2fa1a4b 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -49,7 +49,7 @@
                 int availableBefore = bbis.available();
                 Object instance = recordDescriptor.getFields()[i].deserialize(di);
                 int availableAfter = bbis.available();
-                if (availableBefore - availableAfter != len) {
+                if (availableBefore - availableAfter > len) {
                     throw new IllegalStateException(ERROR_MSG);
                 }
 
@@ -64,11 +64,12 @@
     public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
             throws HyracksDataException {
         try {
+            /** skip vertex id field in deserialization */
             byte[] data = left.getBuffer().array();
             int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
             int leftFieldCount = left.getFieldCount();
             int fStart = tStart;
-            for (int i = 0; i < leftFieldCount; ++i) {
+            for (int i = 1; i < leftFieldCount; ++i) {
                 /**
                  * reset the input
                  */
@@ -82,13 +83,14 @@
                 int availableBefore = bbis.available();
                 Object instance = recordDescriptor.getFields()[i].deserialize(di);
                 int availableAfter = bbis.available();
-                if (availableBefore - availableAfter != fieldLength) {
+                if (availableBefore - availableAfter > fieldLength) {
                     throw new IllegalStateException(ERROR_MSG);
 
                 }
                 record[i] = instance;
             }
-            for (int i = leftFieldCount; i < record.length; ++i) {
+            /** skip vertex id field in deserialization */
+            for (int i = leftFieldCount + 1; i < record.length; ++i) {
                 byte[] rightData = right.getFieldData(i - leftFieldCount);
                 int rightOffset = right.getFieldStart(i - leftFieldCount);
                 int len = right.getFieldLength(i - leftFieldCount);
@@ -97,7 +99,7 @@
                 int availableBefore = bbis.available();
                 Object instance = recordDescriptor.getFields()[i].deserialize(di);
                 int availableAfter = bbis.available();
-                if (availableBefore - availableAfter != len) {
+                if (availableBefore - availableAfter > len) {
                     throw new IllegalStateException(ERROR_MSG);
                 }
                 record[i] = instance;
@@ -113,12 +115,13 @@
             byte[] data = tb.getByteArray();
             int[] offset = tb.getFieldEndOffsets();
             int start = 0;
-            for (int i = 0; i < offset.length; ++i) {
+            /** skip vertex id fields in deserialization */
+            for (int i = 1; i < offset.length; ++i) {
                 /**
                  * reset the input
                  */
+                start = offset[i - 1];
                 bbis.setByteArray(data, start);
-                start = offset[i];
                 int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
 
                 /**
@@ -127,12 +130,13 @@
                 int availableBefore = bbis.available();
                 Object instance = recordDescriptor.getFields()[i].deserialize(di);
                 int availableAfter = bbis.available();
-                if (availableBefore - availableAfter != fieldLength) {
+                if (availableBefore - availableAfter > fieldLength) {
                     throw new IllegalStateException(ERROR_MSG);
                 }
                 record[i] = instance;
             }
-            for (int i = offset.length; i < record.length; ++i) {
+            /** skip vertex id fields in deserialization */
+            for (int i = offset.length + 1; i < record.length; ++i) {
                 byte[] rightData = right.getFieldData(i - offset.length);
                 int rightOffset = right.getFieldStart(i - offset.length);
                 bbis.setByteArray(rightData, rightOffset);
@@ -141,7 +145,7 @@
                 int availableBefore = bbis.available();
                 Object instance = recordDescriptor.getFields()[i].deserialize(di);
                 int availableAfter = bbis.available();
-                if (availableBefore - availableAfter != fieldLength) {
+                if (availableBefore - availableAfter > fieldLength) {
                     throw new IllegalStateException(ERROR_MSG);
                 }
                 record[i] = instance;