add the support for customized partitioner
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
new file mode 100644
index 0000000..f51ad88
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Users can extend this class to implement the desired vertex partitioning behavior.
+ * 
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexPartitioner<I extends WritableComparable> {
+
+    /**
+     * @param vertexId
+     *            The input vertex id.
+     * @param nPartitions
+     *            The total number of partitions.
+     * @return The partition id.
+     */
+    public abstract int getPartitionId(I vertexId, int nPartitions);
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 81472aa..31ad348 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -59,6 +60,8 @@
     public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
     /** The normalized key computer class */
     public static final String NMK_COMPUTER_CLASS = "pregelix.nmkComputerClass";
+    /** The partitioner class */
+    public static final String PARTITIONER_CLASS = "pregelix.partitionerClass";
     /** num of vertices */
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
@@ -178,4 +181,13 @@
     final public void setNoramlizedKeyComputerClass(Class<?> nkcClass) {
         getConfiguration().setClass(NMK_COMPUTER_CLASS, nkcClass, NormalizedKeyComputer.class);
     }
+
+    /**
+     * Set the vertex partitioner class
+     * 
+     * @param partitionerClass
+     */
+    final public void setVertexPartitionerClass(Class<?> partitionerClass) {
+        getConfiguration().setClass(PARTITIONER_CLASS, partitionerClass, VertexPartitioner.class);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6bac923..759c850 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -156,7 +157,7 @@
     }
 
     /**
-     * Create a global aggregator class
+     * Create a global aggregator object
      * 
      * @param conf
      *            Configuration to check
@@ -391,9 +392,9 @@
         try {
             return aggregateValueClass.newInstance();
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createPartialAggregateValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createPartialAggregateValue: Illegally accessed", e);
         }
     }
 
@@ -415,9 +416,9 @@
             }
             return instance;
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createPartialCombineValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createPartialCombineValue: Illegally accessed", e);
         }
     }
 
@@ -433,13 +434,46 @@
         try {
             return aggregateValueClass.newInstance();
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createAggregateValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createAggregateValue: Illegally accessed", e);
         }
     }
 
     /**
+     * Create a user aggregate value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    @SuppressWarnings("rawtypes")
+    public static VertexPartitioner createVertexPartitioner(Configuration conf) {
+        Class<? extends VertexPartitioner> vertexPartitionerClass = getVertexPartitionerClass(conf);
+        try {
+            return vertexPartitionerClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex partitioner class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return The user defined vertex partitioner class
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
+    }
+
+    /**
      * Get the job configuration parameter whether the vertex states will increase dynamically
      * 
      * @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
new file mode 100644
index 0000000..263ec65
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+
+/**
+ * The deafult vertex partitioner which use the hashcode of the vertex id to determine the partition
+ * of the vertex.
+ * 
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultVertexPartitioner<I extends WritableComparable> extends VertexPartitioner<I> {
+
+    @Override
+    public int getPartitionId(I vertexId, int nPartitions) {
+        return vertexId.hashCode() % nPartitions;
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index c0bbafd..1600ab5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -45,4 +45,15 @@
             throw new HyracksDataException(e);
         }
     }
+    
+    @Override
+    public Configuration createConfiguration() throws HyracksDataException{
+        try {
+            Configuration conf = new Configuration();
+            SerDeUtils.deserialize(conf, data);
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index fcdf654..30e617d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -62,6 +62,7 @@
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -82,6 +83,7 @@
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public abstract class JobGen implements IJobGen {
@@ -237,8 +239,7 @@
         /**
          * connect operator descriptors
          */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
         spec.setFrameSize(frameSize);
@@ -313,8 +314,7 @@
         /**
          * connect operator descriptors
          */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
                 comparatorFactories), sorter, 0, writer, 0);
@@ -381,8 +381,7 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
                 comparatorFactories), scanner, 0, writer, 0);
@@ -467,6 +466,18 @@
         return spec;
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+        if (partitionerClazz != null) {
+            return new VertexPartitionComputerFactory(confFactory);
+        } else {
+            return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+                    BspUtils.getVertexIndexClass(conf)));
+        }
+    }
+
     /** generate non-first iteration job */
     protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index d9a24bc..877c6fa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -72,8 +72,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenInnerJoin extends JobGen {
 
@@ -245,8 +243,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
 
         /** connect all operators **/
@@ -470,8 +467,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 5faf122..7af1f79 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -72,8 +72,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoin extends JobGen {
 
@@ -227,8 +225,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -418,8 +415,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -431,8 +428,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index fee738e..641774c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -71,8 +71,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoinSingleSort extends JobGen {
 
@@ -221,8 +219,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -408,8 +405,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index eef6b7e..cec5c63 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -71,8 +71,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoinSort extends JobGen {
 
@@ -234,8 +232,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -441,8 +438,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index dc9698b..0f41568 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -25,4 +25,6 @@
 
     public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
 
+    public Configuration createConfiguration() throws HyracksDataException;
+
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 69c5612..a3a00f8 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
@@ -80,6 +81,7 @@
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -132,6 +134,7 @@
         job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
         job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -234,7 +237,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
-    
+
     private static void generateMaximalCliqueJob2(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(MaximalCliqueVertex.class);
@@ -247,7 +250,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
-    
+
     private static void generateMaximalCliqueJob3(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(MaximalCliqueVertex.class);
@@ -256,6 +259,7 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 46444b3..f510b7f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -137,6 +137,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index ee13335..43c6dd1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -135,6 +135,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 314ca55..3f74cfb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -137,6 +137,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
new file mode 100644
index 0000000..acccabb
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+/**
+ * The vertex-based partition computer factory.
+ * It is used to support customized graph partitioning function.
+ * 
+ * @author yingyib
+ */
+public class VertexPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+
+    public VertexPartitionComputerFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public ITuplePartitionComputer createPartitioner() {
+        try {
+            final Configuration conf = confFactory.createConfiguration();
+            return new ITuplePartitionComputer() {
+                private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+                private final DataInputStream dis = new DataInputStream(bbis);
+                private final VertexPartitioner partitioner = BspUtils.createVertexPartitioner(conf);
+                private final WritableComparable vertexId = BspUtils.createVertexIndex(conf);
+
+                @SuppressWarnings("unchecked")
+                public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                    try {
+                        int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                                + accessor.getFieldStartOffset(tIndex, 0);
+                        bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                        vertexId.readFields(dis);
+                        return Math.abs(partitioner.getPartitionId(vertexId, nParts) % nParts);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            };
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}