Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort

1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.

One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator

Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array

Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/pregelix-runtime/pom.xml b/pregelix-runtime/pom.xml
index 56a52b2..4268444 100644
--- a/pregelix-runtime/pom.xml
+++ b/pregelix-runtime/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-runtime</artifactId>
 	<packaging>jar</packaging>
@@ -111,6 +107,8 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.12-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
similarity index 88%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
index d243c8a..12fb642 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
 
 import java.nio.ByteBuffer;
 
@@ -29,10 +29,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.group.IClusteredAggregatorDescriptorFactory;
 
 public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
 
@@ -73,8 +73,8 @@
             }
 
             @Override
-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public void init(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
                 setGroupKeySize(accessor, tIndex);
                 initAggregateFunctions(state, true);
                 int stateSize = estimateStep(accessor, tIndex, state);
@@ -88,8 +88,8 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
                 int stateSize = estimateStep(accessor, tIndex, state);
                 if (stateSize > frameSize) {
                     emitResultTuple(accessor, tIndex, state);
@@ -99,20 +99,31 @@
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputFinalResult(IFrameTupleAccessor accessor, int tIndex, AggregateState state,
+                    FrameTupleAppender appender) throws HyracksDataException {
                 Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
                 ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
                 IAggregateFunction[] agg = aggState.getRight();
                 for (int i = 0; i < agg.length; i++) {
                     try {
                         agg[i].finishAll();
-                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
-                                aggOutput[i].getLength());
                     } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
                 }
+                //write group Keys
+                for (int i = 0; i < groupFields.length; i++) {
+                    if (!appender.appendField(accessor, tIndex, groupFields[i])) {
+                        return false;
+                    }
+                }
+                //write aggregate fields
+                for (int i = 0; i < agg.length; i++) {
+                    if (!appender.appendField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                            aggOutput[i].getLength())) {
+                        return false;
+                    }
+                }
                 return true;
             }
 
@@ -122,8 +133,8 @@
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, AggregateState state,
+                    FrameTupleAppender appender) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
 
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
similarity index 81%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
index 5bc30a2..0070c91 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -55,6 +55,7 @@
     private Writable combinedResult;
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
+    private boolean skipKey = false;
 
     public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
             IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
@@ -68,6 +69,7 @@
         combiner = BspUtils.createMessageCombiner(conf);
         key = BspUtils.createVertexIndex(conf);
         value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+        skipKey = BspUtils.getSkipCombinerKey(conf);
     }
 
     @Override
@@ -84,8 +86,12 @@
 
     @Override
     public void step(IFrameTupleReference tuple) throws HyracksDataException {
-        if (!partialAggAsInput) {
-            combiner.stepPartial(key, (WritableSizable) value);
+        if (!isFinalStage) {
+            if (!partialAggAsInput) {
+                combiner.stepPartial(key, (WritableSizable) value);
+            } else {
+                combiner.stepPartial2(key, value);
+            }
         } else {
             combiner.stepFinal(key, value);
         }
@@ -95,12 +101,16 @@
     public void finish() throws HyracksDataException {
         try {
             if (!isFinalStage) {
-                combinedResult = combiner.finishPartial();
+                if (!partialAggAsInput) {
+                    combinedResult = combiner.finishPartial();
+                } else {
+                    combinedResult = combiner.finishPartial2();
+                }
             } else {
                 combinedResult = combiner.finishFinal();
             }
             combinedResult.write(output);
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
@@ -109,7 +119,11 @@
     public void finishAll() throws HyracksDataException {
         try {
             if (!isFinalStage) {
-                combinedResult = combiner.finishPartial();
+                if (!partialAggAsInput) {
+                    combinedResult = combiner.finishPartial();
+                } else {
+                    combinedResult = combiner.finishPartial2();
+                }
             } else {
                 combinedResult = combiner.finishFinalAll();
             }
@@ -134,13 +148,20 @@
         valueInputStream.setByteBuffer(buffer, valueStart);
 
         try {
-            if (!keyRead) {
+            //read key if necessary
+            if (!keyRead && !skipKey) {
                 key.readFields(keyInput);
                 keyRead = true;
             }
+            //read value
             value.readFields(valueInput);
-            if (!partialAggAsInput) {
-                return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
+
+            if (!isFinalStage) {
+                if (!partialAggAsInput) {
+                    return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
+                } else {
+                    return combiner.estimateAccumulatedStateByteSizePartial2(key, value);
+                }
             } else {
                 return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
             }
@@ -148,5 +169,4 @@
             throw new HyracksDataException(e);
         }
     }
-
 }
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
similarity index 97%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
index 54eccf5..a0deb46 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
 
 import java.io.DataOutput;
 
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java
new file mode 100644
index 0000000..3906676
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.agg;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings("rawtypes")
+public class SerializableAggregateFunction implements ISerializableAggregateFunction {
+    private final Configuration conf;
+    private final boolean partialAggAsInput;
+    private MessageCombiner combiner;
+    private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
+    private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
+    private ByteBufferInputStream stateInputStream = new ByteBufferInputStream();
+    private DataInput keyInput = new DataInputStream(keyInputStream);
+    private DataInput valueInput = new DataInputStream(valueInputStream);
+    private DataInput stateInput = new DataInputStream(stateInputStream);
+    private ResetableByteArrayOutputStream stateBos = new ResetableByteArrayOutputStream();
+    private DataOutput stateOutput = new DataOutputStream(stateBos);
+    private WritableComparable key;
+    private Writable value;
+    private Writable combinedResult;
+    private Writable finalResult;
+    private MsgList msgList = new MsgList();
+
+    public SerializableAggregateFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory,
+            boolean partialAggAsInput) throws HyracksDataException {
+        this.conf = confFactory.createConfiguration(ctx);
+        this.partialAggAsInput = partialAggAsInput;
+        msgList.setConf(this.conf);
+
+        combiner = BspUtils.createMessageCombiner(conf);
+        key = BspUtils.createVertexIndex(conf);
+        value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(IFrameTupleReference tuple, ArrayTupleBuilder state) throws HyracksDataException {
+        try {
+            /**
+             * bind key and value
+             */
+            bindKeyValue(tuple);
+            key.readFields(keyInput);
+            value.readFields(valueInput);
+
+            combiner.init(msgList);
+
+            /**
+             * call the step function of the aggregator
+             */
+            if (!partialAggAsInput) {
+                combiner.stepPartial(key, (WritableSizable) value);
+            } else {
+                combiner.stepFinal(key, (WritableSizable) value);
+            }
+
+            /**
+             * output state to the array tuple builder
+             */
+            combinedResult = combiner.finishPartial();
+            combinedResult.write(state.getDataOutput());
+            state.addFieldEndOffset();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void step(IFrameTupleReference tuple, IFrameTupleReference state) throws HyracksDataException {
+        try {
+            /**
+             * bind key and value
+             */
+            bindKeyValue(tuple);
+            key.readFields(keyInput);
+            value.readFields(valueInput);
+
+            /**
+             * bind state
+             */
+            bindState(state);
+            combinedResult.readFields(stateInput);
+
+            /**
+             * set the partial state
+             */
+            combiner.setPartialCombineState(combinedResult);
+
+            /**
+             * call the step function of the aggregator
+             */
+            if (!partialAggAsInput) {
+                combiner.stepPartial(key, (WritableSizable) value);
+            } else {
+                combiner.stepFinal(key, (WritableSizable) value);
+            }
+
+            /**
+             * write out partial state
+             */
+            combinedResult = combiner.finishPartial();
+            combinedResult.write(stateOutput);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void finishPartial(IFrameTupleReference state, ArrayTupleBuilder output) throws HyracksDataException {
+        try {
+            /**
+             * bind state
+             */
+            bindState(state);
+            combinedResult.readFields(stateInput);
+
+            /**
+             * set the partial state
+             */
+            combiner.setPartialCombineState(combinedResult);
+            combinedResult = combiner.finishPartial();
+            combinedResult.write(output.getDataOutput());
+            output.addFieldEndOffset();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void finishFinal(IFrameTupleReference state, ArrayTupleBuilder output) throws HyracksDataException {
+        try {
+            /**
+             * bind key and value
+             */
+            bindKeyValue(state);
+            key.readFields(keyInput);
+
+            /**
+             * bind state
+             */
+            bindState(state);
+            combinedResult.readFields(stateInput);
+
+            /**
+             * set the partial state
+             */
+            if (!partialAggAsInput) {
+                combiner.setPartialCombineState(combinedResult);
+                combinedResult = combiner.finishPartial();
+                combinedResult.write(output.getDataOutput());
+            } else {
+                combiner.setPartialCombineState(combinedResult);
+                finalResult = combiner.finishFinal();
+                finalResult.write(output.getDataOutput());
+            }
+            output.addFieldEndOffset();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    /**
+     * @param state
+     */
+    private void bindState(IFrameTupleReference state) {
+        FrameTupleReference ftr = (FrameTupleReference) state;
+        IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+        ByteBuffer buffer = fta.getBuffer();
+        int tIndex = ftr.getTupleIndex();
+        int combinedStateStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+                + fta.getFieldStartOffset(tIndex, 1);
+        stateInputStream.setByteBuffer(buffer, combinedStateStart);
+        stateBos.setByteArray(buffer.array(), combinedStateStart);
+    }
+
+    /**
+     * @param tuple
+     */
+    private void bindKeyValue(IFrameTupleReference tuple) {
+        FrameTupleReference ftr = (FrameTupleReference) tuple;
+        IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+        ByteBuffer buffer = fta.getBuffer();
+        int tIndex = ftr.getTupleIndex();
+        int keyStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex) + fta.getFieldStartOffset(tIndex, 0);
+        int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+                + fta.getFieldStartOffset(tIndex, 1);
+        keyInputStream.setByteBuffer(buffer, keyStart);
+        valueInputStream.setByteBuffer(buffer, valueStart);
+    }
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java
new file mode 100644
index 0000000..c6e41b9
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.runtime.agg;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunctionFactory;
+
+public class SerializableAggregationFunctionFactory implements ISerializableAggregateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+    private final boolean partialAggAsInput;
+
+    public SerializableAggregationFunctionFactory(IConfigurationFactory confFactory, boolean partialAggAsInput) {
+        this.confFactory = confFactory;
+        this.partialAggAsInput = partialAggAsInput;
+    }
+
+    @Override
+    public ISerializableAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IFrameWriter writer)
+            throws HyracksException {
+        return new SerializableAggregateFunction(ctx, confFactory, partialAggAsInput);
+    }
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..11b7b63
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.agg;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunctionFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private ISerializableAggregateFunctionFactory aggFuncFactory;
+
+    public SerializableAggregatorDescriptorFactory(ISerializableAggregateFunctionFactory aggFuncFactory) {
+        this.aggFuncFactory = aggFuncFactory;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+            throws HyracksDataException {
+        try {
+            final FrameTupleReference tupleRef = new FrameTupleReference();
+            final FrameTupleReference stateRef = new FrameTupleReference();
+            final ISerializableAggregateFunction aggFunc = aggFuncFactory.createAggregateFunction(ctx, writer);
+
+            /**
+             * The serializable version aggregator itself is stateless
+             */
+            return new IAggregatorDescriptor() {
+
+                @Override
+                public AggregateState createAggregateStates() {
+                    return new AggregateState();
+                }
+
+                @Override
+                public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                        AggregateState state) throws HyracksDataException {
+                    tupleRef.reset(accessor, tIndex);
+                    aggFunc.init(tupleRef, tupleBuilder);
+                }
+
+                @Override
+                public void reset() {
+
+                }
+
+                @Override
+                public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                        int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                    tupleRef.reset(accessor, tIndex);
+                    stateRef.reset(stateAccessor, stateTupleIndex);
+                    aggFunc.step(tupleRef, stateRef);
+                }
+
+                @Override
+                public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                        int tIndex, AggregateState state) throws HyracksDataException {
+                    stateRef.reset(accessor, tIndex);
+                    aggFunc.finishPartial(stateRef, tupleBuilder);
+                    return true;
+                }
+
+                @Override
+                public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                        int tIndex, AggregateState state) throws HyracksDataException {
+                    stateRef.reset(accessor, tIndex);
+                    aggFunc.finishFinal(stateRef, tupleBuilder);
+                    return true;
+                }
+
+                @Override
+                public void close() {
+
+                }
+
+            };
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 3e4a811..bd05687 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -42,7 +42,7 @@
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
@@ -107,6 +107,7 @@
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
             private boolean dynamicStateLength;
+            private boolean userConfigured;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
@@ -115,6 +116,7 @@
                 //LSM index does not have in-place update
                 this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
                 this.aggregators = BspUtils.createGlobalAggregators(conf);
+                this.userConfigured = false;
                 for (int i = 0; i < aggregators.size(); i++) {
                     this.aggregators.get(i).init();
                 }
@@ -123,7 +125,7 @@
 
                 this.writerMsg = writers[0];
                 this.bufferMsg = ctx.allocateFrame();
-                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize(), 2);
                 this.appenderMsg.reset(bufferMsg, true);
                 this.writers.add(writerMsg);
                 this.appenders.add(appenderMsg);
@@ -155,7 +157,7 @@
                 if (writers.length > 5) {
                     this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
-                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize(), 2);
                     this.appenderAlive.reset(bufferAlive, true);
                     this.pushAlive = true;
                     this.writers.add(writerAlive);
@@ -195,6 +197,10 @@
                 }
 
                 try {
+                    if (!userConfigured) {
+                        vertex.configure(conf);
+                        userConfigured = true;
+                    }
                     if (msgContentList.segmentStart()) {
                         vertex.open();
                     }
@@ -239,6 +245,11 @@
 
                 /** write out global aggregate value */
                 writeOutGlobalAggregate();
+
+                /** end of a superstep, for vertices to release resources */
+                if (userConfigured) {
+                    vertex.endSuperstep(conf);
+                }
             }
 
             private void writeOutGlobalAggregate() throws HyracksDataException {
@@ -255,7 +266,7 @@
                     if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
                             tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
                         // aggregate state exceed the page size, write to HDFS
-                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, vertex.getSuperstep());
                         appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
                     }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 9ddcce5..774c180 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -42,7 +42,7 @@
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
@@ -110,13 +110,15 @@
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
             private boolean dynamicStateLength;
+            private boolean userConfigured;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration(ctx);
                 //LSM index does not have in-place update
-                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
+                this.userConfigured = false;
                 this.aggregators = BspUtils.createGlobalAggregators(conf);
                 for (int i = 0; i < aggregators.size(); i++) {
                     this.aggregators.get(i).init();
@@ -126,7 +128,7 @@
 
                 this.writerMsg = writers[0];
                 this.bufferMsg = ctx.allocateFrame();
-                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize(), 2);
                 this.appenderMsg.reset(bufferMsg, true);
                 this.writers.add(writerMsg);
                 this.appenders.add(appenderMsg);
@@ -158,7 +160,7 @@
                 if (writers.length > 5) {
                     this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
-                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize(), 2);
                     this.appenderAlive.reset(bufferAlive, true);
                     this.pushAlive = true;
                     this.writers.add(writerAlive);
@@ -192,6 +194,10 @@
                 }
 
                 try {
+                    if (!userConfigured) {
+                        vertex.configure(conf);
+                        userConfigured = true;
+                    }
                     vertex.open();
                     vertex.compute(msgIterator);
                     vertex.close();
@@ -228,6 +234,11 @@
 
                 /** write out global aggregate value */
                 writeOutGlobalAggregate();
+
+                /** end of a superstep, for vertices to release resources */
+                if (userConfigured) {
+                    vertex.endSuperstep(conf);
+                }
             }
 
             private void writeOutGlobalAggregate() throws HyracksDataException {
@@ -244,7 +255,7 @@
                     if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
                             tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
                         // aggregate state exceed the page size, write to HDFS
-                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, vertex.getSuperstep());
                         appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
                     }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index e99fcb3..b7a896d 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -22,10 +22,14 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 public class DatatypeHelper {
     private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
@@ -33,11 +37,13 @@
 
         private final Class<T> clazz;
         private transient Configuration conf;
+        private IHyracksTaskContext ctx;
         private T object;
 
-        private WritableSerializerDeserializer(Class<T> clazz, Configuration conf) {
+        private WritableSerializerDeserializer(Class<T> clazz, Configuration conf, IHyracksTaskContext ctx) {
             this.clazz = clazz;
             this.conf = conf;
+            this.ctx = ctx;
         }
 
         @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -49,6 +55,12 @@
             }
             try {
                 T t = clazz.newInstance();
+                if (t instanceof Vertex) {
+                    Vertex vertex = (Vertex) t;
+                    if (vertex.getVertexContext() == null && ctx != null) {
+                        vertex.setVertexContext(IterationUtils.getVertexContext(BspUtils.getJobId(conf), ctx));
+                    }
+                }
                 if (t instanceof ArrayListWritable) {
                     ((ArrayListWritable) t).setConf(conf);
                 }
@@ -87,16 +99,16 @@
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
-            Class<? extends Writable> fClass, Configuration conf) {
-        return new WritableSerializerDeserializer(fClass, conf);
+            Class<? extends Writable> fClass, Configuration conf, IHyracksTaskContext ctx) {
+        return new WritableSerializerDeserializer(fClass, conf, ctx);
     }
 
     public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
             Class<? extends Writable> valueClass, Configuration conf) {
         @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
-        fields[0] = createSerializerDeserializer(keyClass, conf);
-        fields[1] = createSerializerDeserializer(valueClass, conf);
+        fields[0] = createSerializerDeserializer(keyClass, conf, null);
+        fields[1] = createSerializerDeserializer(valueClass, conf, null);
         return new RecordDescriptor(fields);
     }
 }
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index 3151df2..3489578 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
-import java.lang.reflect.Field;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -23,9 +21,11 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 public class RuntimeHookFactory implements IRuntimeHookFactory {
 
@@ -48,12 +48,10 @@
                 try {
                     TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
                     mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-
-                    ClassLoader cl = ctx.getJobletContext().getClassLoader();
-                    Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
-                    Field contextField = vClass.getDeclaredField("context");
-                    contextField.setAccessible(true);
-                    contextField.set(null, mapperContext);
+                    if(BspUtils.getJobId(conf)==null){
+                        System.out.println("here");
+                    }
+                    IterationUtils.setJobContext(BspUtils.getJobId(conf), ctx, mapperContext);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index c9b67fb..4c934d3 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -14,50 +14,45 @@
  */
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
-import java.io.DataInputStream;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
 
 public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
         ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
-    private final ISerializerDeserializerFactory<K> keyIOFactory;
-    private final IConfigurationFactory confFactory;
 
     public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory,
             IConfigurationFactory confFactory) {
-        this.keyIOFactory = keyIOFactory;
-        this.confFactory = confFactory;
     }
 
     public ITuplePartitionComputer createPartitioner() {
         try {
-            final Configuration conf = confFactory.createConfiguration();
             return new ITuplePartitionComputer() {
-                private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-                private final DataInputStream dis = new DataInputStream(bbis);
-                private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer(conf);
 
                 public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
                     int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                             + accessor.getFieldStartOffset(tIndex, 0);
-                    bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                    K key = keyIO.deserialize(dis);
-                    return Math.abs(key.hashCode() % nParts);
+                    int len = accessor.getFieldLength(tIndex, 0);
+                    return Math.abs(hash(accessor.getBuffer().array(), keyStart, len) % nParts);
+                }
+
+                private int hash(byte[] bytes, int offset, int length) {
+                    int value = 1;
+                    int end = offset + length;
+                    for (int i = offset; i < end; i++)
+                        value = value * 31 + (int) bytes[i];
+                    return value;
                 }
             };
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
     }
+
 }
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
index c11ac5b..8b89877 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -31,6 +31,6 @@
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public ISerializerDeserializer getSerializerDeserializer(Configuration conf) {
-        return DatatypeHelper.createSerializerDeserializer(clazz, conf);
+        return DatatypeHelper.createSerializerDeserializer(clazz, conf, null);
     }
 }