let Pregelix support dynamic vertex value size

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2677 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index d62c525..bba2229 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -151,7 +151,7 @@
      * 
      * @param jobId
      */
-    final public void setIncStateLengthDynamically(boolean incStateLengthDynamically) {
+    final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
         getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
     }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 74bccb9..6066dfb 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -418,7 +418,7 @@
      *            the job configuration
      * @return the boolean setting of the parameter, by default it is false
      */
-    public static boolean getIncStateLengthDynamically(Configuration conf) {
+    public static boolean getDynamicVertexValueSize(Configuration conf) {
         return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
     }
 }
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index 62f92dd..a0d365f 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -16,17 +16,19 @@
 package edu.uci.ics.pregelix.dataflow.std.base;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface IUpdateFunction extends IFunction {
 
-    /**
-     * update the tuple pointed by tupleRef called after process,
-     * one-input-tuple-at-a-time
-     * 
-     * @param tupleRef
-     * @throws HyracksDataException
-     */
-    public void update(ITupleReference tupleRef) throws HyracksDataException;
+	/**
+	 * update the tuple pointed by tupleRef called after process,
+	 * one-input-tuple-at-a-time
+	 * 
+	 * @param tupleRef
+	 * @throws HyracksDataException
+	 */
+	public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
+			throws HyracksDataException;
 
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index fb84aa0..9389ab6 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     protected TreeIndexDataflowHelper treeIndexHelper;
@@ -70,6 +71,8 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private final UpdateBuffer updateBuffer;
 
     public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -94,6 +97,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     @Override
@@ -122,6 +126,8 @@
             appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
             indexAccessor = btree.createAccessor();
+
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexHelper.deinit();
             throw new HyracksDataException(e);
@@ -136,7 +142,24 @@
         while (cursor.hasNext()) {
             cursor.next();
             ITupleReference tuple = cursor.getTuple();
-            functionProxy.functionCall(tuple);
+            functionProxy.functionCall(tuple, cloneUpdateTb);
+
+            //doing clone update
+            if (cloneUpdateTb.getSize() > 0) {
+                if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                    //release the cursor/latch
+                    cursor.close();
+                    //batch update
+                    updateBuffer.updateBTree(indexAccessor);
+
+                    //search again
+                    cursor.reset();
+                    rangePred.setLowKey(tuple, true);
+                    rangePred.setHighKey(highKey, highKeyInclusive);
+                    indexAccessor.search(cursor, rangePred);
+                }
+            }
+            cloneUpdateTb.reset();
         }
     }
 
@@ -168,6 +191,8 @@
         try {
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7237537..69bb6a2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -42,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -64,6 +66,8 @@
     private RecordDescriptor recDesc;
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private final UpdateBuffer updateBuffer;
 
     public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -87,6 +91,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -140,6 +145,7 @@
             appender.reset(writeBuffer, true);
 
             indexAccessor = btree.createAccessor();
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
@@ -154,7 +160,23 @@
             /**
              * call the update function
              */
-            functionProxy.functionCall(leftAccessor, tIndex, tupleRef);
+            functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+
+            if (cloneUpdateTb.getSize() > 0) {
+                if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                    //release the cursor/latch
+                    cursor.close();
+                    //batch update
+                    updateBuffer.updateBTree(indexAccessor);
+
+                    //search again
+                    cursor.reset();
+                    rangePred.setLowKey(tupleRef, true);
+                    rangePred.setHighKey(highKey, highKeyInclusive);
+                    indexAccessor.search(cursor, rangePred);
+                }
+            }
+            cloneUpdateTb.reset();
         }
     }
 
@@ -186,6 +208,8 @@
         try {
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index af53abe..fbab036 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -45,6 +45,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
         AbstractUnaryInputOperatorNodePushable {
@@ -76,6 +77,8 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private UpdateBuffer updateBuffer;
 
     public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -100,6 +103,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -144,15 +148,15 @@
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            
-            nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);            
+
+            nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
             dos = nullTupleBuilder.getDataOutput();
             nullTupleBuilder.reset();
             for (int i = 0; i < inputRecDesc.getFields().length; i++) {
                 nullWriter[i].writeNull(dos);
                 nullTupleBuilder.addFieldEndOffset();
             }
-            
+
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -171,18 +175,38 @@
                 match = false;
             }
 
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
         }
     }
 
+    //for the join match casesos
     private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
         /**
          * function call
          */
-        functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again and recover the cursor
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
@@ -236,6 +260,8 @@
             }
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
@@ -267,7 +293,24 @@
         /**
          * function call
          */
-        functionProxy.functionCall(nullTupleBuilder, frameTuple);
+        functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again and recover the cursor
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 1aa044d..de083df 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -42,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -67,6 +69,8 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private UpdateBuffer updateBuffer;
 
     public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -90,6 +94,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -133,6 +138,7 @@
                 currentTopTuple = cursor.getTuple();
                 match = false;
             }
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
 
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
@@ -198,6 +204,9 @@
             }
             try {
                 cursor.close();
+
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
@@ -222,13 +231,47 @@
 
     /** write the right result */
     private void writeRightResults(ITupleReference frameTuple) throws Exception {
-        functionProxy.functionCall(frameTuple);
+        functionProxy.functionCall(frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     /** write the left result */
     private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index bb69ff8..82ac18e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -77,11 +77,11 @@
      *            update pointer
      * @throws HyracksDataException
      */
-    public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right)
-            throws HyracksDataException {
+    public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
+            ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
         function.process(tuple);
-        function.update(right);
+        function.update(right, cloneUpdateTb);
     }
 
     /**
@@ -90,10 +90,10 @@
      * @param updateRef
      * @throws HyracksDataException
      */
-    public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+    public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(updateRef);
         function.process(tuple);
-        function.update(updateRef);
+        function.update(updateRef, cloneUpdateTb);
     }
 
     /**
@@ -101,14 +101,15 @@
      * 
      * @param tb
      *            input data
-     * @param updateRef
+     * @param inPlaceUpdateRef
      *            update pointer
      * @throws HyracksDataException
      */
-    public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
-        Object[] tuple = tupleDe.deserializeRecord(tb, updateRef);
+    public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
+            throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
         function.process(tuple);
-        function.update(updateRef);
+        function.update(inPlaceUpdateRef, cloneUpdateTb);
     }
 
     /**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
new file mode 100644
index 0000000..85503aa
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+/**
+ * The buffer to hold updates.
+ * We do a batch update for the B-tree during index search and join so that
+ * avoid to open/close cursors frequently.
+ */
+public class UpdateBuffer {
+
+    private int currentInUse = 0;
+    private final int pageLimit;
+    private final List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    private final FrameTupleAppender appender;
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleReference tuple = new FrameTupleReference();
+    private final IFrameTupleAccessor fta;
+
+    public UpdateBuffer(int numPages, IHyracksTaskContext ctx, int fieldCount) {
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        ByteBuffer buffer = ctx.allocateFrame();
+        this.buffers.add(buffer);
+        this.appender.reset(buffer, true);
+        this.pageLimit = numPages;
+        this.ctx = ctx;
+        this.fta = new UpdateBufferTupleAccessor(ctx.getFrameSize(), fieldCount);
+    }
+
+    public UpdateBuffer(IHyracksTaskContext ctx, int fieldCount) {
+        //by default, the update buffer has 1000 pages
+        this(1000, ctx, fieldCount);
+    }
+
+    public boolean appendTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            if (currentInUse + 1 < pageLimit) {
+                // move to the new buffer
+                currentInUse++;
+                allocate(currentInUse);
+                ByteBuffer buffer = buffers.get(currentInUse);
+                appender.reset(buffer, true);
+
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new HyracksDataException("tuple cannot be appended to a new frame!");
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            return true;
+        }
+    }
+
+    public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+        // batch update
+        for (int i = 0; i <= currentInUse; i++) {
+            ByteBuffer buffer = buffers.get(i);
+            fta.reset(buffer);
+            for (int j = 0; j < fta.getTupleCount(); j++) {
+                tuple.reset(fta, j);
+                bta.update(tuple);
+            }
+        }
+
+        //cleanup the buffer
+        currentInUse = 0;
+        ByteBuffer buffer = buffers.get(0);
+        appender.reset(buffer, true);
+    }
+
+    private void allocate(int index) {
+        if (index >= buffers.size()) {
+            buffers.add(ctx.allocateFrame());
+        }
+    }
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
new file mode 100644
index 0000000..39f1361
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public final class UpdateBufferTupleAccessor implements IFrameTupleAccessor {
+    private final int frameSize;
+    private final int fieldCount;
+    private ByteBuffer buffer;
+
+    public UpdateBufferTupleAccessor(int frameSize, int fieldCount) {
+        this.frameSize = frameSize;
+        this.fieldCount = fieldCount;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fieldCount;
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index c0b4a10..2c580c4 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -157,13 +157,6 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
-    private static void genPageRank() throws IOException {
-        generatePageRankJob("PageRank", outputBase + "PageRank.xml");
-        generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
-        generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
-        generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
-    }
-
     private static void generateShortestPathJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(ShortestPathsVertex.class);
@@ -177,6 +170,27 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generatePageRankJobRealDynamic(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(PageRankVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        job.setDynamicVertexValueSize(true);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genPageRank() throws IOException {
+        generatePageRankJob("PageRank", outputBase + "PageRank.xml");
+        generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
+        generatePageRankJobRealDynamic("PageRank", outputBase + "PageRankRealDynamic.xml");
+        generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
+        generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
+    }
+
     private static void genShortestPath() throws IOException {
         generateShortestPathJob("ShortestPaths", outputBase + "ShortestPaths.xml");
         generateShortestPathJobReal("ShortestPaths", outputBase + "ShortestPathsReal.xml");
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
new file mode 100644
index 0000000..ab05d38
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
@@ -0,0 +1,20 @@
+0|Vertex(id=0,value=0.008290140026154316, edges=(1,))
+1|Vertex(id=1,value=0.1535152819247165, edges=(1,2,))
+2|Vertex(id=2,value=0.14646839195826475, edges=(1,2,3,))
+3|Vertex(id=3,value=0.08125113985998214, edges=(1,2,3,4,))
+4|Vertex(id=4,value=0.03976979906329426, edges=(1,2,3,4,5,))
+5|Vertex(id=5,value=0.0225041581462058, edges=(1,2,3,4,5,6,))
+6|Vertex(id=6,value=0.015736276824953852, edges=(1,2,3,4,5,6,7,))
+7|Vertex(id=7,value=0.012542224114863661, edges=(1,2,3,4,5,6,7,8,))
+8|Vertex(id=8,value=0.010628239626209894, edges=(1,2,3,4,5,6,7,8,9,))
+9|Vertex(id=9,value=0.009294348455354817, edges=(1,2,3,4,5,6,7,8,9,10,))
+10|Vertex(id=10,value=0.008290140026154316, edges=(11,))
+11|Vertex(id=11,value=0.15351528192471647, edges=(11,12,))
+12|Vertex(id=12,value=0.14646839195826472, edges=(11,12,13,))
+13|Vertex(id=13,value=0.08125113985998214, edges=(11,12,13,14,))
+14|Vertex(id=14,value=0.03976979906329426, edges=(11,12,13,14,15,))
+15|Vertex(id=15,value=0.0225041581462058, edges=(11,12,13,14,15,16,))
+16|Vertex(id=16,value=0.015736276824953852, edges=(11,12,13,14,15,16,17,))
+17|Vertex(id=17,value=0.012542224114863661, edges=(11,12,13,14,15,16,17,18,))
+18|Vertex(id=18,value=0.010628239626209894, edges=(11,12,13,14,15,16,17,18,19,))
+19|Vertex(id=19,value=0.009294348455354817, edges=(0,11,12,13,14,15,16,17,18,19,))
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
new file mode 100644
index 0000000..c1a04ae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 105d3e2..5d8eaf6 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -90,11 +91,13 @@
             private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.conf = confFactory.createConfiguration();
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
                 this.writerMsg = writers[0];
@@ -177,7 +180,8 @@
             private void writeOutGlobalAggregate() throws HyracksDataException {
                 try {
                     /**
-                     * get partial aggregate result and flush to the final aggregator
+                     * get partial aggregate result and flush to the final
+                     * aggregator
                      */
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
@@ -203,15 +207,27 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        int fieldCount = tupleRef.getFieldCount();
-                        for (int i = 1; i < fieldCount; i++) {
-                            byte[] data = tupleRef.getFieldData(i);
-                            int offset = tupleRef.getFieldStart(i);
-                            bbos.setByteArray(data, offset);
-                            vertex.write(output);
+                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
+                            //in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            //write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+
+                            //write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
                         }
                     }
                 } catch (IOException e) {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index f72b059..eb08b51 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -93,11 +94,13 @@
             private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.conf = confFactory.createConfiguration();
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
                 this.writerMsg = writers[0];
@@ -173,8 +176,8 @@
                 if (!terminate) {
                     writeOutTerminationState();
                 }
-                
-                /**write out global aggregate value*/
+
+                /** write out global aggregate value */
                 writeOutGlobalAggregate();
             }
 
@@ -207,15 +210,27 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        int fieldCount = tupleRef.getFieldCount();
-                        for (int i = 1; i < fieldCount; i++) {
-                            byte[] data = tupleRef.getFieldData(i);
-                            int offset = tupleRef.getFieldStart(i);
-                            bbos.setByteArray(data, offset);
-                            vertex.write(output);
+                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
+                            //in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            //write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+
+                            //write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
                         }
                     }
                 } catch (IOException e) {
@@ -224,5 +239,4 @@
             }
         };
     }
-
 }