fix the issue found by Genomix P4 algorithm
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index b345e01..c0825dd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -98,8 +97,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -208,12 +205,13 @@
 
     @Override
     public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);
 
         int[] keyFields = new int[1];
         keyFields[0] = 0;
@@ -271,7 +269,7 @@
         sortFields[0] = 0;
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, sorter);
@@ -330,7 +328,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -529,9 +527,9 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
-        INormalizedKeyComputerFactory nkmFactory = RawNormalizedKeyComputerFactory.INSTANCE;
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, sorter);
@@ -590,7 +588,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -607,8 +605,7 @@
             int[] keyFields = new int[] { 0 };
             INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getFinalNormalizedKeyComputerFactory(conf);
             IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-            sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                    .getClass());
+            sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
             sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
                     recordDescriptor);
             ClusterConfig.setLocationConstraint(spec, scanner);
@@ -734,8 +731,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
-                WritableComparator.get(vertexIdClass).getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 103c1b6..1208556 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,7 +63,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -129,7 +128,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -314,7 +313,7 @@
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
         RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -595,8 +594,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
-                WritableComparator.get(vertexIdClass).getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
@@ -665,7 +663,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), msgListClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 308f422..2e83d2d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -43,7 +43,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -101,7 +100,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -134,8 +133,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
-                .getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
@@ -272,7 +270,7 @@
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
         RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -332,8 +330,7 @@
          */
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
-                .getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 718a271..09c14a0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -42,7 +42,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -100,7 +99,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -261,7 +260,7 @@
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
         RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 75635c9..379147c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -42,7 +42,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -100,7 +99,7 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -275,7 +274,7 @@
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
         RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index 97cea99..9f2a66d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.pregelix.core.jobgen;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -76,8 +77,8 @@
      * @return
      */
     @SuppressWarnings("unchecked")
-    public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class keyClass) {
-        return new WritableComparingBinaryComparatorFactory(keyClass);
+    public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class vertexIdClass) {
+        return new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass).getClass());
     }
 
     /**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index b22e468..2332188 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -176,7 +176,7 @@
              * doing copy update
              */
             CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
-                    cursor, rangePred);
+                    cursor, rangePred, false);
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 0ecfd03..3a36ab4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -293,7 +293,7 @@
          * doing clone update
          */
         CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
-                cursor, rangePred);
+                cursor, rangePred, true);
     }
 
     /** write result for outer case */
@@ -305,7 +305,7 @@
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index a9c787f..8d97289 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -242,7 +242,7 @@
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true);
     }
 
     /** write the left result */
@@ -252,7 +252,7 @@
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index de87909..dbdbfa2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -175,7 +175,7 @@
 
             //doing clone update
             CopyUpdateUtil.copyUpdate(tempTupleReference, tuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                    rangePred);
+                    rangePred, true);
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 392f728..d2478ea 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -27,13 +27,13 @@
 
     public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
             UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
-            IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+            IIndexCursor cursor, RangePredicate rangePred, boolean scan) throws HyracksDataException, IndexException {
         if (cloneUpdateTb.getSize() > 0) {
             int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
             int srcStart = fieldEndOffsets[0];
             int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
             int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
-            if (srcLen <= frSize) {
+            if (srcLen == frSize) {
                 //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
                 System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
                         frameTuple.getFieldStart(1), srcLen);
@@ -53,9 +53,16 @@
                 }
                 //search again and recover the cursor
                 cursor.reset();
-                rangePred.setLowKey(tempTupleReference, false);
-                rangePred.setHighKey(null, true);
+                rangePred.setLowKey(tempTupleReference, true);
+                if (scan) {
+                    rangePred.setHighKey(null, true);
+                } else {
+                    rangePred.setHighKey(tempTupleReference, true);
+                }
                 indexAccessor.search(cursor, rangePred);
+                if (cursor.hasNext()) {
+                    cursor.next();
+                }
             }
             cloneUpdateTb.reset();
         }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
index 998788a..e224617 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
@@ -26,7 +26,7 @@
  * @author yingyib
  */
 public class UpdateVertex extends Vertex<VLongWritable, Text, FloatWritable, VLongWritable> {
-    private final long MAX_VALUE_SIZE = 32768 / 2;
+    private final int MAX_VALUE_SIZE = 32768 / 2;
     private VLongWritable msg = new VLongWritable();
     private Text tempValue = new Text();
 
@@ -34,7 +34,7 @@
     public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
         if (getSuperstep() == 1) {
             updateAndSendMsg();
-        } else if (getSuperstep() > 1 && getSuperstep() < 30) {
+        } else if (getSuperstep() > 1 && getSuperstep() < 2) {
             verifyVertexSize(msgIterator);
             updateAndSendMsg();
         } else {
@@ -43,21 +43,23 @@
     }
 
     private void verifyVertexSize(Iterator<VLongWritable> msgIterator) {
+        if (!msgIterator.hasNext()) {
+            throw new IllegalStateException("no message for vertex " + " " + getVertexId() + " " + getVertexValue());
+        }
         /**
          * verify the size
          */
-        int valueSize = getVertexValue().getLength();
+        int valueSize = getVertexValue().toString().toCharArray().length;
         long expectedValueSize = msgIterator.next().get();
         if (valueSize != expectedValueSize) {
-            throw new IllegalStateException("verte value size:" + valueSize + ", expected value size:"
-                    + expectedValueSize);
+            throw new IllegalStateException("vertex id: " + getVertexId() + " vertex value size:" + valueSize
+                    + ", expected value size:" + expectedValueSize);
         }
     }
 
     private void updateAndSendMsg() {
-        long newValueSize = (long) (Math.random()) % MAX_VALUE_SIZE;
-        msg.set(newValueSize);
-        char[] charArray = new char[(int) newValueSize];
+        int newValueSize = (int) Math.pow(Math.abs(getVertexId().get()), getSuperstep()) % MAX_VALUE_SIZE;
+        char[] charArray = new char[newValueSize];
         for (int i = 0; i < charArray.length; i++) {
             charArray[i] = 'a';
         }
@@ -71,6 +73,7 @@
          * send a self-message
          */
         msg.set(newValueSize);
+
         sendMsg(getVertexId(), msg);
         activate();
     }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
index e28c9e2..076337a 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
@@ -15,45 +15,60 @@
 package edu.uci.ics.pregelix.example;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
-public class UpdateVertexInputFormat extends TextVertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
+public class UpdateVertexInputFormat extends VertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
 
     @Override
     public VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
             TaskAttemptContext context) throws IOException {
-        return new UpdateVertexReader(textInputFormat.createRecordReader(split, context), context);
+        return new UpdateVertexReader(context);
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        InputSplit split = new FileSplit(new Path("testdata"), 0, 0, new String[0]);
+        return Collections.singletonList(split);
     }
 }
 
 @SuppressWarnings("rawtypes")
-class UpdateVertexReader extends TextVertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
+class UpdateVertexReader implements VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
 
-    private final static String separator = " ";
+    private final static int MAX_ID = 65536;
+    private final static int MIN_ID = -65536;
     private Vertex vertex;
     private VLongWritable vertexId = new VLongWritable();
+    private int currentId = MIN_ID;
+    private TaskAttemptContext context;
 
-    public UpdateVertexReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
-        super(lineRecordReader);
+    public UpdateVertexReader(TaskAttemptContext context) {
+        this.context = context;
+    }
+
+    private TaskAttemptContext getContext() {
+        return context;
     }
 
     @Override
     public boolean nextVertex() throws IOException, InterruptedException {
-        return getRecordReader().nextKeyValue();
+        return currentId < MAX_ID;
     }
 
     @SuppressWarnings("unchecked")
@@ -66,22 +81,32 @@
         vertex.getEdges().clear();
 
         vertex.reset();
-        Text line = getRecordReader().getCurrentValue();
-        String[] fields = line.toString().split(separator);
 
-        if (fields.length > 0) {
-            /**
-             * set the src vertex id
-             */
-            long src = Long.parseLong(fields[0]);
-            vertexId.set(src);
-            vertex.setVertexId(vertexId);
+        /**
+         * set the src vertex id
+         */
+        vertexId.set(currentId++);
+        vertex.setVertexId(vertexId);
 
-            /**
-             * set the vertex value
-             */
-            vertex.setVertexValue(new Text("aaa"));
-        }
+        /**
+         * set the vertex value
+         */
+        vertex.setVertexValue(new Text("aaa"));
         return vertex;
     }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
index fe5f2d1..6adc5bb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
@@ -14,14 +14,13 @@
  */
 package edu.uci.ics.pregelix.example;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.util.TestCluster;
@@ -33,7 +32,7 @@
  */
 public class UpdateVertexTest {
 
-    private static String INPUT_PATH = "data/update";
+    private static String INPUT_PATH = "/data/webmap/";
     private static String OUTPUT_PATH = "actual/resultcomplex";
 
     @Test
@@ -52,9 +51,13 @@
 
             Driver driver = new Driver(UpdateVertex.class);
             testCluster.setUp();
-            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            Plan[] plans = new Plan[] { Plan.INNER_JOIN, Plan.OUTER_JOIN };
+            for (Plan plan : plans) {
+                driver.runJob(job, plan, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+            }
         } catch (Exception e) {
-            Assert.assertTrue(e.toString().contains("This job is going to fail"));
+            throw new IllegalStateException(e);
         } finally {
             testCluster.tearDown();
         }