fix file write race condition
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
index 0ce2346..57c9133 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
@@ -25,7 +25,7 @@
      * @param deploymentId
      * @return
      */
-    public IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId);
+    public IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId);
 
     /**
      * Add a deployment with the job serializer deserializer
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
index 35a1e8b..f2d8fff 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -15,18 +15,18 @@
 
 package edu.uci.ics.hyracks.api.job;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 
 public class JobSerializerDeserializerContainer implements IJobSerializerDeserializerContainer {
 
     private IJobSerializerDeserializer defaultJobSerDe = new JobSerializerDeserializer();
-    private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new HashMap<DeploymentId, IJobSerializerDeserializer>();
+    private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new ConcurrentHashMap<DeploymentId, IJobSerializerDeserializer>();
 
     @Override
-    public synchronized IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId) {
+    public synchronized IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId) {
         if (deploymentId == null) {
             return defaultJobSerDe;
         }
@@ -44,4 +44,9 @@
         jobSerializerDeserializerMap.remove(deploymentId);
     }
 
+    @Override
+    public String toString() {
+        return jobSerializerDeserializerMap.toString();
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 3a35c25..9be3818 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -31,7 +31,6 @@
  * This is the IJobSerializerDeserializer implementation for jobs with dynamic deployed jars.
  * 
  * @author yingyib
- *
  */
 public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
 
@@ -99,4 +98,9 @@
     public ClassLoader getClassLoader() throws HyracksException {
         return classLoader;
     }
+
+    @Override
+    public String toString() {
+        return classLoader.toString();
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0677e2e..0724a01 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -89,7 +89,7 @@
      */
     public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
             ServerContext ctx, boolean isNC) throws HyracksException {
-        IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
+        IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
         if (jobSerDe == null) {
             jobSerDe = new ClassLoaderJobSerializerDeserializer();
             container.addJobSerializerDeserializer(deploymentId, jobSerDe);
@@ -116,7 +116,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
             return obj;
         } catch (Exception e) {
@@ -138,7 +138,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
                     .loadClass(className);
             return cl;
@@ -160,7 +160,7 @@
         try {
             IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
             IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
-                    .getJobSerializerDeerializer(deploymentId);
+                    .getJobSerializerDeserializer(deploymentId);
             ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
             return cl;
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 56b6654..586c1a6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -327,4 +327,8 @@
             throw new RuntimeException(e);
         }
     }
+
+    public DeploymentId getDeploymentId() {
+        return deploymentId;
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 26cb8d0..28d424a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -258,8 +258,13 @@
     @Override
     final public void readFields(DataInput in) throws IOException {
         reset();
-        if (vertexId == null)
+        if (vertexId == null) {
+            if (getContext().getConfiguration().getClassLoader() != this.getClass().getClassLoader()) {
+                throw new IllegalStateException("mismatched classloader: "
+                        + getContext().getConfiguration().getClassLoader() + " and " + this.getClass().getClassLoader());
+            }
             vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        }
         vertexId.readFields(in);
         delegate.setVertexId(vertexId);
         boolean hasVertexValue = in.readBoolean();
@@ -576,15 +581,6 @@
         return context;
     }
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param context
-     */
-    public static final void setContext(TaskAttemptContext context) {
-        Vertex.context = context;
-    }
-
     @Override
     public int hashCode() {
         return vertexId.hashCode();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6493127..e79d4d2 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -37,11 +37,6 @@
  * them.
  */
 public class BspUtils {
-    private static Configuration defaultConf = null;
-
-    public static void setDefaultConfiguration(Configuration conf) {
-        defaultConf = conf;
-    }
 
     /**
      * Get the user's subclassed {@link VertexInputFormat}.
@@ -209,8 +204,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
     }
 
@@ -302,8 +295,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
     }
 
@@ -316,8 +307,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
     }
 
@@ -330,8 +319,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
     }
 
@@ -344,8 +331,6 @@
      */
     @SuppressWarnings("unchecked")
     public static Class<? extends NormalizedKeyComputer> getNormalizedKeyComputerClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<? extends NormalizedKeyComputer>) conf.getClass(PregelixJob.NMK_COMPUTER_CLASS,
                 NormalizedKeyComputer.class);
     }
@@ -359,8 +344,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
     }
 
@@ -488,8 +471,6 @@
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
     }
 
@@ -502,8 +483,6 @@
      */
     @SuppressWarnings("unchecked")
     public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
-        if (conf == null)
-            conf = defaultConf;
         return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
     }
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 15981e7..8d9eeac 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -254,7 +254,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -283,7 +283,7 @@
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -325,7 +325,7 @@
          * construct btree search operator
          */
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -347,7 +347,7 @@
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -478,7 +478,7 @@
             return new VertexPartitionComputerFactory(confFactory);
         } else {
             return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
-                    BspUtils.getVertexIndexClass(conf)));
+                    BspUtils.getVertexIndexClass(conf)), confFactory);
         }
     }
 
@@ -514,7 +514,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -585,7 +585,7 @@
          * construct btree search operator
          */
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -614,10 +614,11 @@
         /**
          * construct write file operator
          */
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), vertexClass.getName());
         VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
-                inputRdFactory);
+                inputRdFactory, preHookFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
         /**
@@ -656,7 +657,7 @@
         /**
          * source aggregate
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
 
         /**
@@ -680,7 +681,7 @@
         tmpJob.setOutputValueClass(MsgList.class);
 
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, hdfsWriter);
 
@@ -720,7 +721,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), MsgList.class.getName());
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 8c61e6a..9fefbea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -125,7 +125,7 @@
         /**
          * construct btree search and function call update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -134,20 +134,21 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -166,8 +167,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -311,15 +312,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -360,13 +361,14 @@
          * construct index-join-function-update operator
          */
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -445,8 +447,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -585,7 +587,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), MsgList.class.getName());
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
@@ -662,7 +664,7 @@
         /**
          * construct btree search operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), msgListClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -681,7 +683,7 @@
          * construct write file operator
          */
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
         ClusterConfig.setLocationConstraint(spec, writer);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 8446379..ee576b1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -96,7 +96,7 @@
         /**
          * construct btree search function update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
@@ -106,18 +106,19 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -149,7 +150,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, true);
@@ -183,8 +184,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -267,15 +268,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -309,13 +310,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -379,8 +381,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 0259c5c..32271c8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -96,7 +96,7 @@
         /**
          * construct btree search operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -109,18 +109,19 @@
         /**
          * construct compute operator
          */
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -144,7 +145,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, false);
@@ -177,8 +178,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -258,15 +259,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -300,13 +301,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -362,8 +364,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 0a38d72..3aa36cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -96,7 +96,7 @@
         /**
          * construct btree search function update operator
          */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -106,18 +106,19 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -157,7 +158,7 @@
         /**
          * construct global group-by operator
          */
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
         IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
                 conf, true, true);
@@ -191,8 +192,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -272,15 +273,15 @@
          * source aggregate
          */
         int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), messageValueClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
                 vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -314,13 +315,14 @@
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils
-                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                VLongWritable.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+                partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+                conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -392,8 +394,8 @@
         /**
          * final aggregate write operator
          */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils
-                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, partialAggregateValueClass.getName());
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index a67c259..68e3ba7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,25 +14,32 @@
  */
 package edu.uci.ics.pregelix.core.runtime.touchpoint;
 
+import org.apache.hadoop.conf.Configuration;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 
 public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private String[] fieldClasses;
+    private IConfigurationFactory confFactory;
 
-    public WritableRecordDescriptorFactory(String... fieldClasses) {
+    public WritableRecordDescriptorFactory(Configuration conf, String... fieldClasses) {
         this.fieldClasses = fieldClasses;
+        this.confFactory = new ConfigurationFactory(conf);
     }
 
     @Override
     public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
         try {
-            return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
+            Configuration conf = confFactory.createConfiguration(ctx);
+            return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, conf, fieldClasses);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 3e01109..3a2241b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -38,14 +38,14 @@
     }
 
     @SuppressWarnings("unchecked")
-    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
-            throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(Configuration conf, String className1,
+            String className2) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         try {
             ClassLoader loader = DataflowUtils.class.getClassLoader();
             recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                     (Class<? extends Writable>) loader.loadClass(className1),
-                    (Class<? extends Writable>) loader.loadClass(className2));
+                    (Class<? extends Writable>) loader.loadClass(className2), conf);
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -53,15 +53,16 @@
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(Configuration conf, String... classNames)
+            throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
         ClassLoader loader = DataflowUtils.class.getClassLoader();
         try {
             int i = 0;
             for (String className : classNames)
-                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
-                        .loadClass(className));
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer(
+                        (Class<? extends Writable>) loader.loadClass(className), conf);
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -69,9 +70,9 @@
         return recordDescriptor;
     }
 
-    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
-            throws HyracksException {
-        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(Configuration conf,
+            String... classNames) throws HyracksException {
+        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(conf, classNames);
         return rdFactory;
     }
 
@@ -85,13 +86,13 @@
     }
 
     @SuppressWarnings("unchecked")
-    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
-            String className2) throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, Configuration conf,
+            String className1, String className2) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         try {
             recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
                     .getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
-                    .loadClass(className2));
+                    .loadClass(className2), conf);
         } catch (Exception cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -99,15 +100,17 @@
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
-            throws HyracksException {
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, Configuration conf,
+            String... classNames) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
         try {
             int i = 0;
-            for (String className : classNames)
-                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
-                        .getJobletContext().loadClass(className));
+            for (String className : classNames) {
+                Class<? extends Writable> c = (Class<? extends Writable>) ctx.getJobletContext().loadClass(className);
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer(c, conf);
+                //System.out.println("thread " + Thread.currentThread().getId() + " after creating serde " + c.getClassLoader());
+            }
         } catch (Exception cnfe) {
             throw new HyracksException(cnfe);
         }
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index 2b435a1..77d75bf 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -90,6 +90,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.10-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
index 1fdd0b6..894cba5 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
@@ -16,10 +16,12 @@
 
 import java.io.Serializable;
 
+import org.apache.hadoop.conf.Configuration;
+
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 
 public interface ISerializerDeserializerFactory<T> extends Serializable {
 
-	public ISerializerDeserializer<T> getSerializerDeserializer();
+    public ISerializerDeserializer<T> getSerializerDeserializer(Configuration conf);
 
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index e5bdb17..5579a77 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -57,10 +57,10 @@
      * @throws HyracksDataException
      */
     public void functionOpen() throws HyracksDataException {
+        ctxCL = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
         inputRd = inputRdFactory.createRecordDescriptor(ctx);
         tupleDe = new TupleDeserializer(inputRd);
-        ctxCL = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
         }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index b34879e..e16ba48 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -16,6 +16,8 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -122,7 +124,7 @@
             @SuppressWarnings("unchecked")
             private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
                     throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
-                    IllegalAccessException {
+                    IllegalAccessException, NoSuchFieldException, InvocationTargetException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
@@ -141,7 +143,11 @@
                 /**
                  * set context
                  */
-                Vertex.setContext(mapperContext);
+                ClassLoader cl = ctx.getJobletContext().getClassLoader();
+                Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+                Field contextField = vClass.getDeclaredField("context");
+                contextField.setAccessible(true);
+                contextField.set(null, mapperContext);
 
                 /**
                  * empty vertex value
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
index f3ec40e..2087ea2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -44,17 +44,20 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 
 public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final IConfigurationFactory confFactory;
     private final IRecordDescriptorFactory inputRdFactory;
+    private final IRuntimeHookFactory preHookFactory;
 
     public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
-            IRecordDescriptorFactory inputRdFactory) {
+            IRecordDescriptorFactory inputRdFactory, IRuntimeHookFactory preHookFactory) {
         super(spec, 1, 0);
         this.confFactory = confFactory;
         this.inputRdFactory = inputRdFactory;
+        this.preHookFactory = preHookFactory;
     }
 
     @SuppressWarnings("rawtypes")
@@ -85,6 +88,8 @@
                 context = ctxFactory.createContext(conf, partition);
                 context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                 try {
+                    if (preHookFactory != null)
+                        preHookFactory.createRuntimeHook().configure(ctx);
                     vertexWriter = outputFormat.createVertexWriter(context);
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index b1d1043..a39df3c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -201,6 +201,10 @@
                     }
                     vertex.finishCompute();
                 } catch (Exception e) {
+                    ClassLoader cl1 = vertex.getClass().getClassLoader();
+                    ClassLoader cl2 = msgContentList.get(0).getClass().getClassLoader();
+                    System.out.println("cl1 " + cl1);
+                    System.out.println("cl2 " + cl2);
                     throw new HyracksDataException(e);
                 }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index b121b5b..e99fcb3 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -17,32 +17,30 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 
-@SuppressWarnings("deprecation")
 public class DatatypeHelper {
     private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
         private static final long serialVersionUID = 1L;
 
-        private Class<T> clazz;
+        private final Class<T> clazz;
+        private transient Configuration conf;
         private T object;
 
-        private WritableSerializerDeserializer(Class<T> clazz) {
+        private WritableSerializerDeserializer(Class<T> clazz, Configuration conf) {
             this.clazz = clazz;
+            this.conf = conf;
         }
 
-        @SuppressWarnings("unchecked")
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         private T createInstance() throws HyracksDataException {
             // TODO remove "if", create a new WritableInstanceOperations class
             // that deals with Writables that don't have public constructors
@@ -50,7 +48,11 @@
                 return (T) NullWritable.get();
             }
             try {
-                return clazz.newInstance();
+                T t = clazz.newInstance();
+                if (t instanceof ArrayListWritable) {
+                    ((ArrayListWritable) t).setConf(conf);
+                }
+                return t;
             } catch (InstantiationException e) {
                 throw new HyracksDataException(e);
             } catch (IllegalAccessException e) {
@@ -85,42 +87,16 @@
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
-            Class<? extends Writable> fClass) {
-        return new WritableSerializerDeserializer(fClass);
+            Class<? extends Writable> fClass, Configuration conf) {
+        return new WritableSerializerDeserializer(fClass, conf);
     }
 
     public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
-            Class<? extends Writable> valueClass) {
+            Class<? extends Writable> valueClass, Configuration conf) {
         @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
-        fields[0] = createSerializerDeserializer(keyClass);
-        fields[1] = createSerializerDeserializer(valueClass);
+        fields[0] = createSerializerDeserializer(keyClass, conf);
+        fields[1] = createSerializerDeserializer(valueClass, conf);
         return new RecordDescriptor(fields);
     }
-
-    public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
-        fields[0] = createSerializerDeserializer(fieldClass);
-        return new RecordDescriptor(fields);
-    }
-
-    public static JobConf map2JobConf(Map<String, String> jobConfMap) {
-        JobConf jobConf;
-        synchronized (Configuration.class) {
-            jobConf = new JobConf();
-            for (Entry<String, String> entry : jobConfMap.entrySet()) {
-                jobConf.set(entry.getKey(), entry.getValue());
-            }
-        }
-        return jobConf;
-    }
-
-    public static Map<String, String> jobConf2Map(JobConf jobConf) {
-        Map<String, String> jobConfMap = new HashMap<String, String>();
-        for (Entry<String, String> entry : jobConf) {
-            jobConfMap.put(entry.getKey(), entry.getValue());
-        }
-        return jobConfMap;
-    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index 5e8ac1e..3151df2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
+import java.lang.reflect.Field;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -21,8 +23,6 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -48,8 +48,12 @@
                 try {
                     TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
                     mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-                    Vertex.setContext(mapperContext);
-                    BspUtils.setDefaultConfiguration(conf);
+
+                    ClassLoader cl = ctx.getJobletContext().getClassLoader();
+                    Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+                    Field contextField = vClass.getDeclaredField("context");
+                    contextField.setAccessible(true);
+                    contextField.set(null, mapperContext);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index 6fa6434..c9b67fb 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -16,6 +16,7 @@
 
 import java.io.DataInputStream;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,30 +25,39 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
 
 public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
         ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final ISerializerDeserializerFactory<K> keyIOFactory;
+    private final IConfigurationFactory confFactory;
 
-    public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory) {
+    public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory,
+            IConfigurationFactory confFactory) {
         this.keyIOFactory = keyIOFactory;
+        this.confFactory = confFactory;
     }
 
     public ITuplePartitionComputer createPartitioner() {
-        return new ITuplePartitionComputer() {
-            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-            private final DataInputStream dis = new DataInputStream(bbis);
-            private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer();
+        try {
+            final Configuration conf = confFactory.createConfiguration();
+            return new ITuplePartitionComputer() {
+                private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+                private final DataInputStream dis = new DataInputStream(bbis);
+                private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer(conf);
 
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 0);
-                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                K key = keyIO.deserialize(dis);
-                return Math.abs(key.hashCode() % nParts);
-            }
-        };
+                public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                    int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(tIndex, 0);
+                    bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                    K key = keyIO.deserialize(dis);
+                    return Math.abs(key.hashCode() % nParts);
+                }
+            };
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
index 435d081..c11ac5b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -29,7 +30,7 @@
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
-    public ISerializerDeserializer getSerializerDeserializer() {
-        return DatatypeHelper.createSerializerDeserializer(clazz);
+    public ISerializerDeserializer getSerializerDeserializer(Configuration conf) {
+        return DatatypeHelper.createSerializerDeserializer(clazz, conf);
     }
 }