Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
index 0ce2346..57c9133 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
@@ -25,7 +25,7 @@
      * @param deploymentId
      * @return
      */
-    public IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId);
+    public IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId);
 
     /**
      * Add a deployment with the job serializer deserializer
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
index 35a1e8b..f2d8fff 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -15,18 +15,18 @@
 
 package edu.uci.ics.hyracks.api.job;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 
 public class JobSerializerDeserializerContainer implements IJobSerializerDeserializerContainer {
 
     private IJobSerializerDeserializer defaultJobSerDe = new JobSerializerDeserializer();
-    private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new HashMap<DeploymentId, IJobSerializerDeserializer>();
+    private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new ConcurrentHashMap<DeploymentId, IJobSerializerDeserializer>();
 
     @Override
-    public synchronized IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId) {
+    public synchronized IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId) {
         if (deploymentId == null) {
             return defaultJobSerDe;
         }
@@ -44,4 +44,9 @@
         jobSerializerDeserializerMap.remove(deploymentId);
     }
 
+    @Override
+    public String toString() {
+        return jobSerializerDeserializerMap.toString();
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 3a35c25..9be3818 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -31,7 +31,6 @@
  * This is the IJobSerializerDeserializer implementation for jobs with dynamic deployed jars.
  * 
  * @author yingyib
- *
  */
 public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
 
@@ -99,4 +98,9 @@
     public ClassLoader getClassLoader() throws HyracksException {
         return classLoader;
     }
+
+    @Override
+    public String toString() {
+        return classLoader.toString();
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0677e2e..0724a01 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -89,7 +89,7 @@
      */
     public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
             ServerContext ctx, boolean isNC) throws HyracksException {
-        IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
+        IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
         if (jobSerDe == null) {
             jobSerDe = new ClassLoaderJobSerializerDeserializer();
             container.addJobSerializerDeserializer(deploymentId, jobSerDe);
@@ -116,7 +116,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
             return obj;
         } catch (Exception e) {
@@ -138,7 +138,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
                     .loadClass(className);
             return cl;
@@ -160,7 +160,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
             return cl;
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 56b6654..586c1a6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -327,4 +327,8 @@
             throw new RuntimeException(e);
         }
     }
+
+    public DeploymentId getDeploymentId() {
+        return deploymentId;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
index 22c02d1..b2ba638 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -20,4 +20,6 @@
     public Object getReplacementStrategyObject();
 
     public boolean pinIfGoodVictim();
+
+    public void markDirty();
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 26cb8d0..28d424a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -258,8 +258,13 @@
     @Override
     final public void readFields(DataInput in) throws IOException {
         reset();
-        if (vertexId == null)
+        if (vertexId == null) {
+            if (getContext().getConfiguration().getClassLoader() != this.getClass().getClassLoader()) {
+                throw new IllegalStateException("mismatched classloader: "
+                        + getContext().getConfiguration().getClassLoader() + " and " + this.getClass().getClassLoader());
+            }
             vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        }
         vertexId.readFields(in);
         delegate.setVertexId(vertexId);
         boolean hasVertexValue = in.readBoolean();
@@ -576,15 +581,6 @@
         return context;
     }
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param context
-     */
-    public static final void setContext(TaskAttemptContext context) {
-        Vertex.context = context;
-    }
-
     @Override
     public int hashCode() {
         return vertexId.hashCode();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6549c52..15aa7c4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -182,6 +182,15 @@
     }
 
     /**
+     * Set whether the vertex state length is fixed
+     * 
+     * @param jobId
+     */
+    final public void setFixedVertexValueSize(boolean fixedSize) {
+        getConfiguration().setBoolean(INCREASE_STATE_LENGTH, !fixedSize);
+    }
+
+    /**
      * Set the frame size for a job
      * 
      * @param frameSize
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 4ee1deb..e79d4d2 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -37,11 +37,6 @@
  * them.
  */
 public class BspUtils {
-    private static Configuration defaultConf = null;
-
-    public static void setDefaultConfiguration(Configuration conf) {
-        defaultConf = conf;
-    }
 
     /**
      * Get the user's subclassed {@link VertexInputFormat}.
@@ -209,8 +204,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
     }
 
@@ -302,8 +295,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
     }
 
@@ -316,8 +307,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
     }
 
@@ -330,8 +319,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
     }
 
@@ -344,8 +331,6 @@
      */
     @SuppressWarnings("unchecked")
     public static Class<? extends NormalizedKeyComputer> getNormalizedKeyComputerClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<? extends NormalizedKeyComputer>) conf.getClass(PregelixJob.NMK_COMPUTER_CLASS,
                 NormalizedKeyComputer.class);
     }
@@ -359,8 +344,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
     }
 
@@ -488,8 +471,6 @@
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
     }
 
@@ -502,8 +483,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
     }
 
@@ -515,7 +494,7 @@
      * @return the boolean setting of the parameter, by default it is false
      */
     public static boolean getDynamicVertexValueSize(Configuration conf) {
-        return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+        return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, true);
     }
 
     /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index b345e01..8d9eeac 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -98,8 +97,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -134,7 +131,7 @@
     protected PregelixJob pregelixJob;
     protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+    protected String jobId = UUID.randomUUID().toString();
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
@@ -208,12 +205,13 @@
 
     @Override
     public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         JobSpecification spec = new JobSpecification();
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);
 
         int[] keyFields = new int[1];
         keyFields[0] = 0;
@@ -256,7 +254,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -271,7 +269,7 @@
         sortFields[0] = 0;
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, sorter);
@@ -285,7 +283,7 @@
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -327,10 +325,10 @@
          * construct btree search operator
          */
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -349,7 +347,7 @@
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -480,7 +478,7 @@
             return new VertexPartitionComputerFactory(confFactory);
         } else {
             return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
-                    BspUtils.getVertexIndexClass(conf)));
+                    BspUtils.getVertexIndexClass(conf)), confFactory);
         }
     }
 
@@ -516,7 +514,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -529,9 +527,9 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
-        INormalizedKeyComputerFactory nkmFactory = RawNormalizedKeyComputerFactory.INSTANCE;
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, sorter);
@@ -587,10 +585,10 @@
          * construct btree search operator
          */
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -607,8 +605,7 @@
             int[] keyFields = new int[] { 0 };
             INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getFinalNormalizedKeyComputerFactory(conf);
             IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-            sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                    .getClass());
+            sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
             sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
                     recordDescriptor);
             ClusterConfig.setLocationConstraint(spec, scanner);
@@ -617,10 +614,11 @@
         /**
          * construct write file operator
          */
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
-                inputRdFactory);
+                inputRdFactory, preHookFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
         /**
@@ -659,7 +657,7 @@
         /**
          * source aggregate
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
 
         /**
@@ -672,7 +670,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                false);
+                false, jobId, lastSuccessfulIteration + 1);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
@@ -683,7 +681,7 @@
         tmpJob.setOutputValueClass(MsgList.class);
 
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, hdfsWriter);
 
@@ -723,7 +721,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), MsgList.class.getName());
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
@@ -734,8 +732,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
-                WritableComparator.get(vertexIdClass).getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
@@ -744,7 +741,7 @@
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
-                recordDescriptor);
+                recordDescriptor, jobId, lastCheckpointedIteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 103c1b6..9fefbea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,7 +63,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -126,29 +125,30 @@
         /**
          * construct btree search and function call update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -167,8 +167,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -218,7 +218,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /**
@@ -311,15 +312,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -339,7 +340,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -360,13 +361,14 @@
          * construct index-join-function-update operator
          */
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -422,7 +424,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
@@ -444,8 +447,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -584,7 +587,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), MsgList.class.getName());
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
@@ -595,8 +598,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
-                WritableComparator.get(vertexIdClass).getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
@@ -662,10 +664,10 @@
         /**
          * construct btree search operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), msgListClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -681,7 +683,7 @@
          * construct write file operator
          */
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 308f422..ee576b1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -17,7 +17,6 @@
 import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,7 +42,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -98,28 +96,29 @@
         /**
          * construct btree search function update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -134,8 +133,7 @@
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
-                .getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
@@ -152,7 +150,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, true);
@@ -163,7 +161,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -185,8 +184,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -269,15 +268,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -297,7 +296,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -311,13 +310,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -332,8 +332,7 @@
          */
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
-                .getClass());
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
@@ -359,7 +358,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
@@ -381,8 +381,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 718a271..32271c8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -42,7 +42,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -97,10 +96,10 @@
         /**
          * construct btree search operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -110,18 +109,19 @@
         /**
          * construct compute operator
          */
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -145,7 +145,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, false);
@@ -156,7 +156,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -177,8 +178,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -258,15 +259,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -286,7 +287,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -300,13 +301,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -339,7 +341,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
@@ -361,8 +364,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 75635c9..3aa36cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -42,7 +42,6 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -97,28 +96,29 @@
         /**
          * construct btree search function update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -158,7 +158,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, true);
@@ -169,7 +169,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -191,8 +192,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -272,15 +273,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -300,7 +301,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -314,13 +315,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -369,7 +371,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
@@ -391,8 +394,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index 97cea99..9f2a66d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.pregelix.core.jobgen;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -76,8 +77,8 @@
      * @return
      */
     @SuppressWarnings("unchecked")
-    public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class keyClass) {
-        return new WritableComparingBinaryComparatorFactory(keyClass);
+    public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class vertexIdClass) {
+        return new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass).getClass());
     }
 
     /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index a67c259..68e3ba7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,25 +14,32 @@
  */
 package edu.uci.ics.pregelix.core.runtime.touchpoint;
 
+import org.apache.hadoop.conf.Configuration;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 
 public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private String[] fieldClasses;
+    private IConfigurationFactory confFactory;
 
-    public WritableRecordDescriptorFactory(String... fieldClasses) {
+    public WritableRecordDescriptorFactory(Configuration conf, String... fieldClasses) {
         this.fieldClasses = fieldClasses;
+        this.confFactory = new ConfigurationFactory(conf);
     }
 
     @Override
     public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
         try {
-            return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
+            Configuration conf = confFactory.createConfiguration(ctx);
+            return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, conf, fieldClasses);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 3e01109..3a2241b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -38,14 +38,14 @@
     }
 
     @SuppressWarnings("unchecked")
-    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
-            throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(Configuration conf, String className1,
+            String className2) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         try {
             ClassLoader loader = DataflowUtils.class.getClassLoader();
             recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                     (Class<? extends Writable>) loader.loadClass(className1),
-                    (Class<? extends Writable>) loader.loadClass(className2));
+                    (Class<? extends Writable>) loader.loadClass(className2), conf);
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -53,15 +53,16 @@
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(Configuration conf, String... classNames)
+            throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
         ClassLoader loader = DataflowUtils.class.getClassLoader();
         try {
             int i = 0;
             for (String className : classNames)
-                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
-                        .loadClass(className));
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer(
+                        (Class<? extends Writable>) loader.loadClass(className), conf);
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -69,9 +70,9 @@
         return recordDescriptor;
     }
 
-    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
-            throws HyracksException {
-        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(Configuration conf,
+            String... classNames) throws HyracksException {
+        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(conf, classNames);
         return rdFactory;
     }
 
@@ -85,13 +86,13 @@
     }
 
     @SuppressWarnings("unchecked")
-    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
-            String className2) throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, Configuration conf,
+            String className1, String className2) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         try {
             recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
                     .getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
-                    .loadClass(className2));
+                    .loadClass(className2), conf);
         } catch (Exception cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -99,15 +100,17 @@
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
-            throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, Configuration conf,
+            String... classNames) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
         try {
             int i = 0;
-            for (String className : classNames)
-                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
-                        .getJobletContext().loadClass(className));
+            for (String className : classNames) {
+                Class<? extends Writable> c = (Class<? extends Writable>) ctx.getJobletContext().loadClass(className);
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer(c, conf);
+                //System.out.println("thread " + Thread.currentThread().getId() + " after creating serde " + c.getClassLoader());
+            }
         } catch (Exception cnfe) {
             throw new HyracksException(cnfe);
         }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index aabd4ba..70de9ed 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -62,7 +62,7 @@
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
         ccConfig.heartbeatPeriod = 50;
-        ccConfig.maxHeartbeatLapsePeriods = 15;
+        ccConfig.maxHeartbeatLapsePeriods = 10;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index d4c0ee6..77d75bf 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -1,28 +1,24 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow-std-base</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow-std-base</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.10-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.10-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -58,7 +54,7 @@
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-                <version>2.4.1</version>
+				<version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -94,6 +90,20 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-common</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
 			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
index 1fdd0b6..894cba5 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
@@ -16,10 +16,12 @@
 
 import java.io.Serializable;
 
+import org.apache.hadoop.conf.Configuration;
+
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 
 public interface ISerializerDeserializerFactory<T> extends Serializable {
 
-	public ISerializerDeserializer<T> getSerializerDeserializer();
+    public ISerializerDeserializer<T> getSerializerDeserializer(Configuration conf);
 
 }
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index b8ba7bd..081b3bc 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -18,17 +18,18 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 
 public interface IUpdateFunction extends IFunction {
 
-	/**
-	 * update the tuple pointed by tupleRef called after process,
-	 * one-input-tuple-at-a-time
-	 * 
-	 * @param tupleRef
-	 * @throws HyracksDataException
-	 */
-	public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
-			throws HyracksDataException;
+    /**
+     * update the tuple pointed by tupleRef called after process,
+     * one-input-tuple-at-a-time
+     * 
+     * @param tupleRef
+     * @throws HyracksDataException
+     */
+    public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+            throws HyracksDataException;
 
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index b22e468..b646661 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
 import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
 import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
 import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -68,6 +70,7 @@
     private ArrayTupleBuilder cloneUpdateTb;
     private final UpdateBuffer updateBuffer;
     private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+    private final StorageType storageType;
 
     public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -77,6 +80,11 @@
             throws HyracksDataException {
         treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
+        if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+            storageType = StorageType.TreeIndex;
+        } else {
+            storageType = StorageType.LSMIndex;
+        }
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
@@ -170,13 +178,13 @@
             /**
              * call the update function
              */
-            functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+            functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
 
             /**
              * doing copy update
              */
             CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
-                    cursor, rangePred);
+                    cursor, rangePred, false, storageType);
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 0ecfd03..2557a07 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -45,6 +46,7 @@
 import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
 import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
 import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
@@ -79,6 +81,7 @@
     private ArrayTupleBuilder cloneUpdateTb;
     private final UpdateBuffer updateBuffer;
     private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+    private final StorageType storageType;
 
     public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -88,6 +91,11 @@
         inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
+        if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+            storageType = StorageType.TreeIndex;
+        } else {
+            storageType = StorageType.LSMIndex;
+        }
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
 
         if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -287,13 +295,13 @@
         /**
          * function call
          */
-        functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+        functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
 
         /**
          * doing clone update
          */
         CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
-                cursor, rangePred);
+                cursor, rangePred, true, storageType);
     }
 
     /** write result for outer case */
@@ -301,11 +309,11 @@
         /**
          * function call
          */
-        functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+        functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb, cursor);
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true, storageType);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index a9c787f..1d9fd70 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
 import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
 import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
 import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -71,6 +73,7 @@
     private ArrayTupleBuilder cloneUpdateTb;
     private UpdateBuffer updateBuffer;
     private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+    private final StorageType storageType;
 
     public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -79,6 +82,11 @@
             IRecordDescriptorFactory inputRdFactory, int outputArity) throws HyracksDataException {
         treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
+        if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+            storageType = StorageType.TreeIndex;
+        } else {
+            storageType = StorageType.LSMIndex;
+        }
         this.isForward = isForward;
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
 
@@ -238,21 +246,21 @@
 
     /** write the right result */
     private void writeRightResults(ITupleReference frameTuple) throws Exception {
-        functionProxy.functionCall(frameTuple, cloneUpdateTb);
+        functionProxy.functionCall(frameTuple, cloneUpdateTb, cursor);
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true, storageType);
     }
 
     /** write the left result */
     private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb, cursor);
 
         //doing clone update
         CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                rangePred);
+                rangePred, true, storageType);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index de87909..1003431 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -44,6 +45,7 @@
 import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
 import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
 import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -77,6 +79,7 @@
     private ArrayTupleBuilder cloneUpdateTb;
     private final UpdateBuffer updateBuffer;
     private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+    private final StorageType storageType;
 
     public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -86,6 +89,11 @@
             throws HyracksDataException {
         treeIndexHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
+        if (treeIndexHelper instanceof TreeIndexDataflowHelper) {
+            storageType = StorageType.TreeIndex;
+        } else {
+            storageType = StorageType.LSMIndex;
+        }
         this.isForward = isForward;
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
@@ -171,11 +179,11 @@
         while (cursor.hasNext()) {
             cursor.next();
             ITupleReference tuple = cursor.getTuple();
-            functionProxy.functionCall(tuple, cloneUpdateTb);
+            functionProxy.functionCall(tuple, cloneUpdateTb, cursor);
 
             //doing clone update
             CopyUpdateUtil.copyUpdate(tempTupleReference, tuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
-                    rangePred);
+                    rangePred, true, storageType);
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 392f728..0ff3f04 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -27,19 +27,9 @@
 
     public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
             UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
-            IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+            IIndexCursor cursor, RangePredicate rangePred, boolean scan, StorageType type) throws HyracksDataException,
+            IndexException {
         if (cloneUpdateTb.getSize() > 0) {
-            int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
-            int srcStart = fieldEndOffsets[0];
-            int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
-            int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
-            if (srcLen <= frSize) {
-                //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
-                System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
-                        frameTuple.getFieldStart(1), srcLen);
-                cloneUpdateTb.reset();
-                return;
-            }
             if (!updateBuffer.appendTuple(cloneUpdateTb)) {
                 tempTupleReference.reset(frameTuple.getFieldData(0), frameTuple.getFieldStart(0),
                         frameTuple.getFieldLength(0));
@@ -51,11 +41,18 @@
                 if (!updateBuffer.appendTuple(cloneUpdateTb)) {
                     throw new HyracksDataException("cannot append tuple builder!");
                 }
-                //search again and recover the cursor
+                //search again and recover the cursor to the exact point as the one before it is closed
                 cursor.reset();
-                rangePred.setLowKey(tempTupleReference, false);
-                rangePred.setHighKey(null, true);
+                rangePred.setLowKey(tempTupleReference, true);
+                if (scan) {
+                    rangePred.setHighKey(null, true);
+                } else {
+                    rangePred.setHighKey(tempTupleReference, true);
+                }
                 indexAccessor.search(cursor, rangePred);
+                if (cursor.hasNext()) {
+                    cursor.next();
+                }
             }
             cloneUpdateTb.reset();
         }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index a1e5b86..5579a77 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
@@ -56,10 +57,10 @@
      * @throws HyracksDataException
      */
     public void functionOpen() throws HyracksDataException {
+        ctxCL = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
         inputRd = inputRdFactory.createRecordDescriptor(ctx);
         tupleDe = new TupleDeserializer(inputRd);
-        ctxCL = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
         }
@@ -80,10 +81,10 @@
      * @throws HyracksDataException
      */
     public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
-            ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+            ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor) throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
         function.process(tuple);
-        function.update(right, cloneUpdateTb);
+        function.update(right, cloneUpdateTb, cursor);
     }
 
     /**
@@ -92,10 +93,11 @@
      * @param updateRef
      * @throws HyracksDataException
      */
-    public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+    public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+            throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(updateRef);
         function.process(tuple);
-        function.update(updateRef, cloneUpdateTb);
+        function.update(updateRef, cloneUpdateTb, cursor);
     }
 
     /**
@@ -107,11 +109,11 @@
      *            update pointer
      * @throws HyracksDataException
      */
-    public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
-            throws HyracksDataException {
+    public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb,
+            IIndexCursor cursor) throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
         function.process(tuple);
-        function.update(inPlaceUpdateRef, cloneUpdateTb);
+        function.update(inPlaceUpdateRef, cloneUpdateTb, cursor);
     }
 
     /**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
new file mode 100644
index 0000000..fb2d1eb
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+public enum StorageType {
+    TreeIndex,
+    LSMIndex
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index b44b643..a5d2ab7 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -31,11 +31,15 @@
 public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final boolean removeIterationState;
+    private final String jobId;
+    private final int iteration;
 
     public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
-            boolean removeIterationState) {
+            boolean removeIterationState, String jobId, int iteration) {
         super(spec, 1, 1);
         this.removeIterationState = removeIterationState;
+        this.jobId = jobId;
+        this.iteration = iteration - 1;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -55,8 +59,8 @@
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 if (!complete) {
-                    MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx,
-                            partition);
+                    MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx, jobId,
+                            partition, iteration);
                     RunFileReader in = state.getRunFileWriter().createReader();
                     writer.open();
                     try {
@@ -85,7 +89,7 @@
                  * remove last iteration's state
                  */
                 if (removeIterationState) {
-                    IterationUtils.removeIterationState(ctx, partition);
+                    IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
                 }
                 writer.close();
                 complete = true;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 00dcbd1..921dc40 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -38,9 +38,14 @@
 public class MaterializingWriteOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final static int MATERIALIZER_ACTIVITY_ID = 0;
+    private final String jobId;
+    private final int iteration;
 
-    public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+    public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, String jobId,
+            int iteration) {
         super(spec, 1, 1);
+        this.jobId = jobId;
+        this.iteration = iteration;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -69,13 +74,12 @@
                 @Override
                 public void open() throws HyracksDataException {
                     /** remove last iteration's state */
-                    IterationUtils.removeIterationState(ctx, partition);
+                    IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
                     state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
                     INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
                     RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-                    FileReference file = context.createManagedWorkspaceFile(MaterializingWriteOperatorDescriptor.class
-                            .getSimpleName());
+                    FileReference file = context.createManagedWorkspaceFile(jobId);
                     state.setRunFileWriter(new RunFileWriter(file, ctx.getIOManager()));
                     state.getRunFileWriter().open();
                     writer.open();
@@ -92,7 +96,7 @@
                     /**
                      * set iteration state
                      */
-                    IterationUtils.setIterationState(ctx, partition, state);
+                    IterationUtils.setIterationState(ctx, jobId, partition, iteration, state);
                     writer.close();
                 }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index b34879e..e16ba48 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -16,6 +16,8 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -122,7 +124,7 @@
             @SuppressWarnings("unchecked")
             private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
                     throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
-                    IllegalAccessException {
+                    IllegalAccessException, NoSuchFieldException, InvocationTargetException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
@@ -141,7 +143,11 @@
                 /**
                  * set context
                  */
-                Vertex.setContext(mapperContext);
+                ClassLoader cl = ctx.getJobletContext().getClassLoader();
+                Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+                Field contextField = vClass.getDeclaredField("context");
+                contextField.setAccessible(true);
+                contextField.set(null, mapperContext);
 
                 /**
                  * empty vertex value
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
index f3ec40e..2087ea2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -44,17 +44,20 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 
 public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final IConfigurationFactory confFactory;
     private final IRecordDescriptorFactory inputRdFactory;
+    private final IRuntimeHookFactory preHookFactory;
 
     public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
-            IRecordDescriptorFactory inputRdFactory) {
+            IRecordDescriptorFactory inputRdFactory, IRuntimeHookFactory preHookFactory) {
         super(spec, 1, 0);
         this.confFactory = confFactory;
         this.inputRdFactory = inputRdFactory;
+        this.preHookFactory = preHookFactory;
     }
 
     @SuppressWarnings("rawtypes")
@@ -85,6 +88,8 @@
                 context = ctxFactory.createContext(conf, partition);
                 context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                 try {
+                    if (preHookFactory != null)
+                        preHookFactory.createRuntimeHook().configure(ctx);
                     vertexWriter = outputFormat.createVertexWriter(context);
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
new file mode 100644
index 0000000..ceb085c
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
@@ -0,0 +1,126 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+public class PJobContext {
+    private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
+
+    private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final Map<TaskIterationID, IStateObject> appStateMap = new ConcurrentHashMap<TaskIterationID, IStateObject>();
+    private Long jobIdToSuperStep;
+    private Boolean jobIdToMove;
+
+    public void close() throws HyracksDataException {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        appStateMap.clear();
+    }
+
+    public void clearState() throws HyracksDataException {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        appStateMap.clear();
+    }
+
+    public Map<TaskIterationID, IStateObject> getAppStateStore() {
+        return appStateMap;
+    }
+
+    public static RuntimeContext get(IHyracksTaskContext ctx) {
+        return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+    }
+
+    public void setVertexProperties(long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+        if (jobIdToMove == null || jobIdToMove == true) {
+            if (jobIdToSuperStep == null) {
+                if (currentIteration <= 0) {
+                    jobIdToSuperStep = 0L;
+                } else {
+                    jobIdToSuperStep = currentIteration;
+                }
+            }
+
+            long superStep = jobIdToSuperStep;
+            List<FileReference> files = iterationToFiles.remove(superStep - 1);
+            if (files != null) {
+                for (FileReference fileRef : files)
+                    fileRef.delete();
+            }
+
+            setProperties(numVertices, numEdges, currentIteration, superStep, false, cl);
+        }
+        System.gc();
+    }
+
+    public void recoverVertexProperties(long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+        if (jobIdToSuperStep == null) {
+            if (currentIteration <= 0) {
+                jobIdToSuperStep = 0L;
+            } else {
+                jobIdToSuperStep = currentIteration;
+            }
+        }
+
+        long superStep = jobIdToSuperStep;
+        List<FileReference> files = iterationToFiles.remove(superStep - 1);
+        if (files != null) {
+            for (FileReference fileRef : files)
+                fileRef.delete();
+        }
+
+        setProperties(numVertices, numEdges, currentIteration, superStep, true, cl);
+    }
+
+    public void endSuperStep() {
+        jobIdToMove = true;
+        LOGGER.info("end iteration " + Vertex.getSuperstep());
+    }
+
+    public Map<Long, List<FileReference>> getIterationToFiles() {
+        return iterationToFiles;
+    }
+
+    private void setProperties(long numVertices, long numEdges, long currentIteration, long superStep, boolean toMove,
+            ClassLoader cl) {
+        try {
+            Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+            Method superStepMethod = vClass.getMethod("setSuperstep", Long.TYPE);
+            Method numVerticesMethod = vClass.getMethod("setNumVertices", Long.TYPE);
+            Method numEdgesMethod = vClass.getMethod("setNumEdges", Long.TYPE);
+
+            if (currentIteration > 0) {
+                //Vertex.setSuperstep(currentIteration);
+                superStepMethod.invoke(null, currentIteration);
+            } else {
+                //Vertex.setSuperstep(++superStep);
+                superStepMethod.invoke(null, ++superStep);
+            }
+            //Vertex.setNumVertices(numVertices);
+            numVerticesMethod.invoke(null, numVertices);
+            //Vertex.setNumEdges(numEdges);
+            numEdgesMethod.invoke(null, numEdges);
+            jobIdToSuperStep = superStep;
+            jobIdToMove = toMove;
+            LOGGER.info("start iteration " + Vertex.getSuperstep());
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index f3f7513..854e3dc 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -20,7 +20,6 @@
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +47,6 @@
 import edu.uci.ics.pregelix.api.graph.Vertex;
 
 public class RuntimeContext implements IWorkspaceFileFactory {
-    private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
 
     private final IIndexLifecycleManager lcManager;
     private final ILocalResourceRepository localResourceRepository;
@@ -57,10 +55,7 @@
     private final List<IVirtualBufferCache> vbcs;
     private final IFileMapManager fileMapManager;
     private final IOManager ioManager;
-    private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
-    private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
-    private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
-    private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+    private final Map<String, PJobContext> activeJobs = new ConcurrentHashMap<String, PJobContext>();
 
     private final ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
@@ -91,28 +86,12 @@
         resourceIdFactory = new ResourceIdFactory(0);
     }
 
-    public void close() throws HyracksDataException {
-        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
-            for (FileReference fileRef : entry.getValue())
-                fileRef.delete();
-
-        iterationToFiles.clear();
+    public synchronized void close() throws HyracksDataException {
         bufferCache.close();
-        appStateMap.clear();
-
-        System.gc();
-    }
-
-    public void clearState(String jobId) throws HyracksDataException {
-        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
-            for (FileReference fileRef : entry.getValue())
-                fileRef.delete();
-
-        iterationToFiles.clear();
-        appStateMap.clear();
-        jobIdToMove.remove(jobId);
-        jobIdToSuperStep.remove(jobId);
-        System.gc();
+        for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
+            entry.getValue().close();
+        }
+        activeJobs.clear();
     }
 
     public ILocalResourceRepository getLocalResourceRepository() {
@@ -139,87 +118,55 @@
         return fileMapManager;
     }
 
-    public Map<StateKey, IStateObject> getAppStateStore() {
-        return appStateMap;
+    public synchronized Map<TaskIterationID, IStateObject> getAppStateStore(String jobId) {
+        PJobContext activeJob = getActiveJob(jobId);
+        return activeJob.getAppStateStore();
     }
 
     public static RuntimeContext get(IHyracksTaskContext ctx) {
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
-        Boolean toMove = jobIdToMove.get(jobId);
-        if (toMove == null || toMove == true) {
-            if (jobIdToSuperStep.get(jobId) == null) {
-                if (currentIteration <= 0) {
-                    jobIdToSuperStep.put(jobId, 0L);
-                } else {
-                    jobIdToSuperStep.put(jobId, currentIteration);
-                }
-            }
-
-            long superStep = jobIdToSuperStep.get(jobId);
-            List<FileReference> files = iterationToFiles.remove(superStep - 1);
-            if (files != null) {
-                for (FileReference fileRef : files)
-                    fileRef.delete();
-            }
-
-            if (currentIteration > 0) {
-                Vertex.setSuperstep(currentIteration);
-            } else {
-                Vertex.setSuperstep(++superStep);
-            }
-            Vertex.setNumVertices(numVertices);
-            Vertex.setNumEdges(numEdges);
-            jobIdToSuperStep.put(jobId, superStep);
-            jobIdToMove.put(jobId, false);
-            LOGGER.info("start iteration " + Vertex.getSuperstep());
-        }
-        System.gc();
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
+            ClassLoader cl) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.setVertexProperties(numVertices, numEdges, currentIteration, cl);
     }
 
     public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
-            long currentIteration) {
-        if (jobIdToSuperStep.get(jobId) == null) {
-            if (currentIteration <= 0) {
-                jobIdToSuperStep.put(jobId, 0L);
-            } else {
-                jobIdToSuperStep.put(jobId, currentIteration);
-            }
-        }
-
-        long superStep = jobIdToSuperStep.get(jobId);
-        List<FileReference> files = iterationToFiles.remove(superStep - 1);
-        if (files != null) {
-            for (FileReference fileRef : files)
-                fileRef.delete();
-        }
-
-        if (currentIteration > 0) {
-            Vertex.setSuperstep(currentIteration);
-        } else {
-            Vertex.setSuperstep(++superStep);
-        }
-        Vertex.setNumVertices(numVertices);
-        Vertex.setNumEdges(numEdges);
-        jobIdToSuperStep.put(jobId, superStep);
-        jobIdToMove.put(jobId, true);
-        LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+            long currentIteration, ClassLoader cl) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.recoverVertexProperties(numVertices, numEdges, currentIteration, cl);
     }
 
-    public synchronized void endSuperStep(String pregelixJobId) {
-        jobIdToMove.put(pregelixJobId, true);
-        LOGGER.info("end iteration " + Vertex.getSuperstep());
+    public synchronized void endSuperStep(String jobId) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.endSuperStep();
+    }
+
+    public synchronized void clearState(String jobId) throws HyracksDataException {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.clearState();
+        activeJobs.remove(jobId);
+    }
+
+    private PJobContext getActiveJob(String jobId) {
+        PJobContext activeJob = activeJobs.get(jobId);
+        if (activeJob == null) {
+            activeJob = new PJobContext();
+            activeJobs.put(jobId, activeJob);
+        }
+        return activeJob;
     }
 
     @Override
-    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
-        final FileReference fRef = ioManager.createWorkspaceFile(prefix);
-        List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
+    public FileReference createManagedWorkspaceFile(String jobId) throws HyracksDataException {
+        final FileReference fRef = ioManager.createWorkspaceFile(jobId);
+        PJobContext activeJob = getActiveJob(jobId);
+        List<FileReference> files = activeJob.getIterationToFiles().get(Vertex.getSuperstep());
         if (files == null) {
             files = new ArrayList<FileReference>();
-            iterationToFiles.put(Vertex.getSuperstep(), files);
+            activeJob.getIterationToFiles().put(Vertex.getSuperstep(), files);
         }
         files.add(fRef);
         return fRef;
@@ -229,4 +176,5 @@
     public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
         return ioManager.createWorkspaceFile(prefix);
     }
+
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
similarity index 64%
rename from pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
rename to pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
index cbd90b9..f219ed2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
@@ -12,34 +12,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package edu.uci.ics.pregelix.dataflow.context;
 
-import edu.uci.ics.hyracks.api.job.JobId;
+public class TaskID {
 
-public class StateKey {
-    private final JobId jobId;
-    private final int partition;
+    private String jobId;
+    private int partition;
 
-    public StateKey(JobId jobId, int partition) {
+    public TaskID(String jobId, int partition) {
         this.jobId = jobId;
         this.partition = partition;
     }
 
+    public String getJobId() {
+        return jobId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
     @Override
     public int hashCode() {
-        return jobId.hashCode() * partition;
+        return jobId.hashCode() + partition;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof StateKey))
+        if (!(o instanceof TaskID)) {
             return false;
-        StateKey key = (StateKey) o;
-        return key.jobId.equals(jobId) && key.partition == partition;
+        }
+        TaskID tid = (TaskID) o;
+        return jobId.equals(tid.getJobId()) && partition == tid.getPartition();
     }
 
     @Override
     public String toString() {
-        return jobId.toString() + ":" + partition;
+        return "job:" + jobId + " partition:" + partition;
     }
+
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
new file mode 100644
index 0000000..53c6a0c
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+public class TaskIterationID {
+
+    private TaskID tid;
+    private int iteration;
+
+    public TaskIterationID(TaskID tid, int iteration) {
+        this.tid = tid;
+        this.iteration = iteration;
+    }
+
+    public TaskIterationID(String jobId, int partition, int iteration) {
+        this.tid = new TaskID(jobId, partition);
+        this.iteration = iteration;
+    }
+
+    public TaskID getTaskID() {
+        return tid;
+    }
+
+    public int getIteration() {
+        return iteration;
+    }
+
+    public TaskIterationID getNextTaskIterationID() {
+        return new TaskIterationID(tid, iteration + 1);
+    }
+
+    public TaskIterationID getPreviousTaskIterationID() {
+        return new TaskIterationID(tid, iteration - 1);
+    }
+
+    @Override
+    public int hashCode() {
+        return tid.hashCode() + iteration;
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof TaskIterationID)) {
+            return false;
+        }
+        TaskIterationID tiid = (TaskIterationID) o;
+        return tid.equals(tiid.getTaskID()) && iteration == tiid.getIteration();
+    }
+
+    @Override
+    public String toString() {
+        return tid.toString() + " iteration:" + iteration;
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 02097bf..8052b7c 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -28,73 +28,66 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-import edu.uci.ics.pregelix.dataflow.context.StateKey;
+import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
 
 public class IterationUtils {
     public static final String TMP_DIR = "/tmp/";
 
-    public static void setIterationState(IHyracksTaskContext ctx, int partition, IStateObject state) {
+    public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
+            IStateObject state) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        map.put(new StateKey(ctx.getJobletContext().getJobId(), partition), state);
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        map.put(new TaskIterationID(pregelixJobId, partition, iteration), state);
     }
 
-    public static IStateObject getIterationState(IHyracksTaskContext ctx, int partition) {
-        JobId currentId = ctx.getJobletContext().getJobId();
-        JobId lastId = new JobId(currentId.getId() - 1);
+    public static IStateObject getIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition,
+            int iteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        IStateObject state = map.get(new StateKey(lastId, partition));
-        while (state == null) {
-            /** in case the last job is a checkpointing job */
-            lastId = new JobId(lastId.getId() - 1);
-            state = map.get(new StateKey(lastId, partition));
-        }
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        IStateObject state = map.get(new TaskIterationID(pregelixJobId, partition, iteration));
         return state;
     }
 
-    public static void removeIterationState(IHyracksTaskContext ctx, int partition) {
-        JobId currentId = ctx.getJobletContext().getJobId();
-        JobId lastId = new JobId(currentId.getId() - 1);
+    public static void removeIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        map.remove(new StateKey(lastId, partition));
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        map.remove(new TaskIterationID(pregelixJobId, partition, iteration));
     }
 
-    public static void endSuperStep(String giraphJobId, IHyracksTaskContext ctx) {
+    public static void endSuperStep(String pregelixJobId, IHyracksTaskContext ctx) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.endSuperStep(giraphJobId);
+        context.endSuperStep(pregelixJobId);
     }
 
-    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
-        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
-        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
-    }
-
-    public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+    public static void setProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
             long currentIteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+        context.setVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
     }
 
-    public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
+    public static void recoverProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
+            long currentIteration) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        context.recoverVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
+    }
+
+    public static void writeTerminationState(Configuration conf, String pregelixJobId, boolean terminate)
             throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId;
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
             Path path = new Path(pathStr);
             FSDataOutputStream output = dfs.create(path, true);
             output.writeBoolean(terminate);
@@ -105,11 +98,11 @@
         }
     }
 
-    public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
+    public static void writeGlobalAggregateValue(Configuration conf, String pregelixJobId, Writable agg)
             throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId + "agg";
             Path path = new Path(pathStr);
             FSDataOutputStream output = dfs.create(path, true);
             agg.write(output);
@@ -120,10 +113,10 @@
         }
     }
 
-    public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+    public static boolean readTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId;
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
             Path path = new Path(pathStr);
             FSDataInputStream input = dfs.open(path);
             boolean terminate = input.readBoolean();
@@ -134,8 +127,8 @@
         }
     }
 
-    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        JobStateUtils.writeForceTerminationState(conf, jobId);
+    public static void writeForceTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
+        JobStateUtils.writeForceTerminationState(conf, pregelixJobId);
     }
 
     public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index a866c1c..2508a1e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -218,6 +218,7 @@
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setFixedVertexValueSize(true);
         Client.run(args, job);
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
index ff1e29f..421f2f5 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -56,6 +56,7 @@
             FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
             job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
             job.setCheckpointHook(ConservativeCheckpointHook.class);
+            job.setFixedVertexValueSize(true);
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index 3fdaf15..b3ad112 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -55,6 +55,7 @@
             FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
             job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
             job.setCheckpointHook(ConservativeCheckpointHook.class);
+            job.setFixedVertexValueSize(true);
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
index e006ccd..9a2ef2c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -53,6 +53,7 @@
             FileInputFormat.setInputPaths(job, INPUTPATH);
             FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
             job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setFixedVertexValueSize(true);
 
             testCluster.setUp();
             Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java
new file mode 100644
index 0000000..65b9845
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests multi-user workload, using ConnectedComponents.
+ * 
+ * @author yingyib
+ */
+public class MultiJobConnectedComponentsTest {
+    private static String INPUTPATH = "data/webmapcomplex";
+    private static String OUTPUTPAH = "actual/result";
+    private static String OUTPUTPAH2 = "actual/result2";
+    private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+        try {
+            PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+            job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+            job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+            job.setDynamicVertexValueSize(true);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        Driver driver = new Driver(PageRankVertex.class);
+                        PregelixJob job2 = new PregelixJob(ConnectedComponentsVertex.class.getName());
+                        job2.setVertexClass(ConnectedComponentsVertex.class);
+                        job2.setVertexClass(ConnectedComponentsVertex.class);
+                        job2.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+                        job2.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+                        job2.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+                        job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+                        job2.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+                        job2.setDynamicVertexValueSize(true);
+                        FileInputFormat.setInputPaths(job2, INPUTPATH);
+                        FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH2));
+                        job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+                        job2.setCheckpointHook(ConservativeCheckpointHook.class);
+                        driver.runJob(job2, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+                        TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH2));
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+            thread.join();
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.deinit();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java
new file mode 100644
index 0000000..cfd1b27
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests multi-user workload, using PageRank.
+ * 
+ * @author yingyib
+ */
+public class MultiJobPageRankTest {
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String OUTPUTPAH2 = "actual/result2";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+            job.setFixedVertexValueSize(true);
+
+            testCluster.setUp();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        Driver driver = new Driver(PageRankVertex.class);
+                        PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
+                        job2.setVertexClass(PageRankVertex.class);
+                        job2.setVertexInputFormatClass(TextPageRankInputFormat.class);
+                        job2.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+                        job2.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+                        job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+                        FileInputFormat.setInputPaths(job2, INPUTPATH);
+                        FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH2));
+                        job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+                        job2.setCheckpointHook(ConservativeCheckpointHook.class);
+                        job2.setFixedVertexValueSize(true);
+                        driver.runJob(job2, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+                        TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH2));
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+            thread.join();
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.deinit();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
index 9629b5b..1e6359d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.pregelix.example;
 
 import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
@@ -26,29 +27,80 @@
  * @author yingyib
  */
 public class UpdateVertex extends Vertex<VLongWritable, Text, FloatWritable, VLongWritable> {
+    private final int MAX_VALUE_SIZE = 32768 / 2;
+    private VLongWritable msg = new VLongWritable();
+    private Text tempValue = new Text();
+    private Random rand = new Random();
 
     @Override
     public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
-        if (getSuperstep() < 14) {
-            Text value = getVertexValue();
-            String newValue = value.toString() + value.toString();
-            value.set(newValue);
-            activate();
-        } else if (getSuperstep() >= 14 && getSuperstep() < 28) {
-            Text value = getVertexValue();
-            String valueStr = value.toString();
-            char[] valueChars = valueStr.toCharArray();
-            if (valueChars.length <= 1) {
-                throw new IllegalStateException("illegal value length: " + valueChars.length);
-            }
-            char[] newValueChars = new char[valueChars.length / 2];
-            for (int i = 0; i < newValueChars.length; i++) {
-                newValueChars[i] = valueChars[i];
-            }
-            value.set(new String(newValueChars));
-            activate();
+        if (getSuperstep() == 1) {
+            updateAndSendMsg();
+        } else if (getSuperstep() > 1 && getSuperstep() < 2) {
+            verifyVertexSize(msgIterator);
+            updateAndSendMsg();
         } else {
             voteToHalt();
         }
     }
+
+    private void verifyVertexSize(Iterator<VLongWritable> msgIterator) {
+        if (!msgIterator.hasNext()) {
+            throw new IllegalStateException("no message for vertex " + " " + getVertexId() + " " + getVertexValue());
+        }
+        /**
+         * verify the size
+         */
+        int valueSize = getVertexValue().toString().toCharArray().length;
+        long expectedValueSize = msgIterator.next().get();
+        if (valueSize != expectedValueSize) {
+            if (valueSize == -expectedValueSize) {
+                //verify fixed size update
+                char[] valueCharArray = getVertexValue().toString().toCharArray();
+                for (int i = 0; i < valueCharArray.length; i++) {
+                    if (valueCharArray[i] != 'b') {
+                        throw new IllegalStateException("vertex id: " + getVertexId()
+                                + " has a un-propagated update in the last iteration");
+                    }
+                }
+            } else {
+                throw new IllegalStateException("vertex id: " + getVertexId() + " vertex value size:" + valueSize
+                        + ", expected value size:" + expectedValueSize);
+            }
+        }
+        if (msgIterator.hasNext()) {
+            throw new IllegalStateException("more than one message for vertex " + " " + getVertexId() + " "
+                    + getVertexValue());
+        }
+    }
+
+    private void updateAndSendMsg() {
+        int newValueSize = rand.nextInt(MAX_VALUE_SIZE);
+        char[] charArray = new char[newValueSize];
+        for (int i = 0; i < charArray.length; i++) {
+            charArray[i] = 'a';
+        }
+        /**
+         * set a self-message
+         */
+        msg.set(newValueSize);
+        boolean fixedSize = getVertexId().get() < 2000;
+        if (fixedSize) {
+            int oldSize = getVertexValue().toString().toCharArray().length;
+            charArray = new char[oldSize];
+            for (int i = 0; i < oldSize; i++) {
+                charArray[i] = 'b';
+            }
+            msg.set(-oldSize);
+        }
+
+        /**
+         * set the vertex value
+         */
+        tempValue.set(new String(charArray));
+        setVertexValue(tempValue);
+
+        sendMsg(getVertexId(), msg);
+        activate();
+    }
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
index e28c9e2..076337a 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
@@ -15,45 +15,60 @@
 package edu.uci.ics.pregelix.example;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.example.io.DoubleWritable;
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
-public class UpdateVertexInputFormat extends TextVertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
+public class UpdateVertexInputFormat extends VertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
 
     @Override
     public VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
             TaskAttemptContext context) throws IOException {
-        return new UpdateVertexReader(textInputFormat.createRecordReader(split, context), context);
+        return new UpdateVertexReader(context);
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        InputSplit split = new FileSplit(new Path("testdata"), 0, 0, new String[0]);
+        return Collections.singletonList(split);
     }
 }
 
 @SuppressWarnings("rawtypes")
-class UpdateVertexReader extends TextVertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
+class UpdateVertexReader implements VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
 
-    private final static String separator = " ";
+    private final static int MAX_ID = 65536;
+    private final static int MIN_ID = -65536;
     private Vertex vertex;
     private VLongWritable vertexId = new VLongWritable();
+    private int currentId = MIN_ID;
+    private TaskAttemptContext context;
 
-    public UpdateVertexReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
-        super(lineRecordReader);
+    public UpdateVertexReader(TaskAttemptContext context) {
+        this.context = context;
+    }
+
+    private TaskAttemptContext getContext() {
+        return context;
     }
 
     @Override
     public boolean nextVertex() throws IOException, InterruptedException {
-        return getRecordReader().nextKeyValue();
+        return currentId < MAX_ID;
     }
 
     @SuppressWarnings("unchecked")
@@ -66,22 +81,32 @@
         vertex.getEdges().clear();
 
         vertex.reset();
-        Text line = getRecordReader().getCurrentValue();
-        String[] fields = line.toString().split(separator);
 
-        if (fields.length > 0) {
-            /**
-             * set the src vertex id
-             */
-            long src = Long.parseLong(fields[0]);
-            vertexId.set(src);
-            vertex.setVertexId(vertexId);
+        /**
+         * set the src vertex id
+         */
+        vertexId.set(currentId++);
+        vertex.setVertexId(vertexId);
 
-            /**
-             * set the vertex value
-             */
-            vertex.setVertexValue(new Text("aaa"));
-        }
+        /**
+         * set the vertex value
+         */
+        vertex.setVertexValue(new Text("aaa"));
         return vertex;
     }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
index 5c16302..493a33e 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
@@ -47,9 +47,6 @@
         public void writeVertex(Vertex<VLongWritable, Text, FloatWritable, ?> vertex) throws IOException,
                 InterruptedException {
             int len = vertex.getVertexValue().toString().toCharArray().length;
-            if (len != 1) {
-                throw new IllegalStateException("invalid value length: " + len);
-            }
             getRecordWriter().write(new Text(vertex.getVertexId().toString()), new Text(Integer.toString(len)));
         }
     }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
index fe5f2d1..6adc5bb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
@@ -14,14 +14,13 @@
  */
 package edu.uci.ics.pregelix.example;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.util.TestCluster;
@@ -33,7 +32,7 @@
  */
 public class UpdateVertexTest {
 
-    private static String INPUT_PATH = "data/update";
+    private static String INPUT_PATH = "/data/webmap/";
     private static String OUTPUT_PATH = "actual/resultcomplex";
 
     @Test
@@ -52,9 +51,13 @@
 
             Driver driver = new Driver(UpdateVertex.class);
             testCluster.setUp();
-            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            Plan[] plans = new Plan[] { Plan.INNER_JOIN, Plan.OUTER_JOIN };
+            for (Plan plan : plans) {
+                driver.runJob(job, plan, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+            }
         } catch (Exception e) {
-            Assert.assertTrue(e.toString().contains("This job is going to fail"));
+            throw new IllegalStateException(e);
         } finally {
             testCluster.tearDown();
         }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index f6857fe..2dfd071 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -74,6 +74,7 @@
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setFixedVertexValueSize(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -89,6 +90,7 @@
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
         job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+        job.setFixedVertexValueSize(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -116,6 +118,7 @@
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setFixedVertexValueSize(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -190,6 +193,7 @@
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setFixedVertexValueSize(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 65e0b30..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -127,6 +127,7 @@
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b50b02a..69f8154 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -128,6 +128,7 @@
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 217fbba..04fc686 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -1,145 +1,146 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
 <property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
 <property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
 <property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
 <property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
 <property><name>mapred.acls.enabled</name><value>false</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 636b055..8969fd7 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -127,6 +127,7 @@
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 6564eb0..245c0f5 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -141,6 +141,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-common</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-btree</artifactId>
 			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f3a0bb4..a39df3c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -110,7 +113,8 @@
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration(ctx);
-                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                //LSM index does not have in-place update
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -197,6 +201,10 @@
                     }
                     vertex.finishCompute();
                 } catch (Exception e) {
+                    ClassLoader cl1 = vertex.getClass().getClassLoader();
+                    ClassLoader cl2 = msgContentList.get(0).getClass().getClassLoader();
+                    System.out.println("cl1 " + cl1);
+                    System.out.println("cl2 " + cl2);
                     throw new HyracksDataException(e);
                 }
 
@@ -262,7 +270,8 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
                         if (!dynamicStateLength) {
@@ -271,12 +280,16 @@
                             int offset = tupleRef.getFieldStart(1);
                             bbos.setByteArray(data, offset);
                             vertex.write(output);
+
+                            BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+                            ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+                            //IMPORTANT: mark the page to be dirty
+                            page.markDirty();
                         } else {
                             // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
                             vertex.getVertexId().write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
-
                             // write the vertex value
                             vertex.write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
index 88577c2..8947c01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 
@@ -57,7 +58,8 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
 
             }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index ca8ec01..05b4e87 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -113,7 +116,8 @@
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration(ctx);
-                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                //LSM index does not have in-place update
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -255,7 +259,8 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
                         if (!dynamicStateLength) {
@@ -264,6 +269,11 @@
                             int offset = tupleRef.getFieldStart(1);
                             bbos.setByteArray(data, offset);
                             vertex.write(output);
+
+                            BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+                            ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+                            //IMPORTANT: mark the page to be dirty
+                            page.markDirty();
                         } else {
                             // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index b121b5b..e99fcb3 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -17,32 +17,30 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 
-@SuppressWarnings("deprecation")
 public class DatatypeHelper {
     private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
         private static final long serialVersionUID = 1L;
 
-        private Class<T> clazz;
+        private final Class<T> clazz;
+        private transient Configuration conf;
         private T object;
 
-        private WritableSerializerDeserializer(Class<T> clazz) {
+        private WritableSerializerDeserializer(Class<T> clazz, Configuration conf) {
             this.clazz = clazz;
+            this.conf = conf;
         }
 
-        @SuppressWarnings("unchecked")
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         private T createInstance() throws HyracksDataException {
             // TODO remove "if", create a new WritableInstanceOperations class
             // that deals with Writables that don't have public constructors
@@ -50,7 +48,11 @@
                 return (T) NullWritable.get();
             }
             try {
-                return clazz.newInstance();
+                T t = clazz.newInstance();
+                if (t instanceof ArrayListWritable) {
+                    ((ArrayListWritable) t).setConf(conf);
+                }
+                return t;
             } catch (InstantiationException e) {
                 throw new HyracksDataException(e);
             } catch (IllegalAccessException e) {
@@ -85,42 +87,16 @@
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
-            Class<? extends Writable> fClass) {
-        return new WritableSerializerDeserializer(fClass);
+            Class<? extends Writable> fClass, Configuration conf) {
+        return new WritableSerializerDeserializer(fClass, conf);
     }
 
     public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
-            Class<? extends Writable> valueClass) {
+            Class<? extends Writable> valueClass, Configuration conf) {
         @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
-        fields[0] = createSerializerDeserializer(keyClass);
-        fields[1] = createSerializerDeserializer(valueClass);
+        fields[0] = createSerializerDeserializer(keyClass, conf);
+        fields[1] = createSerializerDeserializer(valueClass, conf);
         return new RecordDescriptor(fields);
     }
-
-    public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
-        fields[0] = createSerializerDeserializer(fieldClass);
-        return new RecordDescriptor(fields);
-    }
-
-    public static JobConf map2JobConf(Map<String, String> jobConfMap) {
-        JobConf jobConf;
-        synchronized (Configuration.class) {
-            jobConf = new JobConf();
-            for (Entry<String, String> entry : jobConfMap.entrySet()) {
-                jobConf.set(entry.getKey(), entry.getValue());
-            }
-        }
-        return jobConf;
-    }
-
-    public static Map<String, String> jobConf2Map(JobConf jobConf) {
-        Map<String, String> jobConfMap = new HashMap<String, String>();
-        for (Entry<String, String> entry : jobConf) {
-            jobConfMap.put(entry.getKey(), entry.getValue());
-        }
-        return jobConfMap;
-    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index 5e8ac1e..3151df2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
+import java.lang.reflect.Field;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -21,8 +23,6 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -48,8 +48,12 @@
                 try {
                     TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
                     mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-                    Vertex.setContext(mapperContext);
-                    BspUtils.setDefaultConfiguration(conf);
+
+                    ClassLoader cl = ctx.getJobletContext().getClassLoader();
+                    Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+                    Field contextField = vClass.getDeclaredField("context");
+                    contextField.setAccessible(true);
+                    contextField.set(null, mapperContext);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index 6fa6434..c9b67fb 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -16,6 +16,7 @@
 
 import java.io.DataInputStream;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,30 +25,39 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
 
 public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
         ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final ISerializerDeserializerFactory<K> keyIOFactory;
+    private final IConfigurationFactory confFactory;
 
-    public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory) {
+    public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory,
+            IConfigurationFactory confFactory) {
         this.keyIOFactory = keyIOFactory;
+        this.confFactory = confFactory;
     }
 
     public ITuplePartitionComputer createPartitioner() {
-        return new ITuplePartitionComputer() {
-            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-            private final DataInputStream dis = new DataInputStream(bbis);
-            private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer();
+        try {
+            final Configuration conf = confFactory.createConfiguration();
+            return new ITuplePartitionComputer() {
+                private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+                private final DataInputStream dis = new DataInputStream(bbis);
+                private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer(conf);
 
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 0);
-                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                K key = keyIO.deserialize(dis);
-                return Math.abs(key.hashCode() % nParts);
-            }
-        };
+                public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                    int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(tIndex, 0);
+                    bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                    K key = keyIO.deserialize(dis);
+                    return Math.abs(key.hashCode() % nParts);
+                }
+            };
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
index 435d081..c11ac5b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -29,7 +30,7 @@
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
-    public ISerializerDeserializer getSerializerDeserializer() {
-        return DatatypeHelper.createSerializerDeserializer(clazz);
+    public ISerializerDeserializer getSerializerDeserializer(Configuration conf) {
+        return DatatypeHelper.createSerializerDeserializer(clazz, conf);
     }
 }