cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-dataflow-std/pom.xml b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
index cc9a184..c5a0913 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/pom.xml
+++ b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow-std</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow-std</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -22,8 +23,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -42,6 +44,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -100,11 +103,13 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index fb84aa0..3938613 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     protected TreeIndexDataflowHelper treeIndexHelper;
@@ -70,6 +71,8 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private final UpdateBuffer updateBuffer;
 
     public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -94,6 +97,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     @Override
@@ -122,6 +126,9 @@
             appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
             indexAccessor = btree.createAccessor();
+
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexHelper.deinit();
             throw new HyracksDataException(e);
@@ -136,7 +143,24 @@
         while (cursor.hasNext()) {
             cursor.next();
             ITupleReference tuple = cursor.getTuple();
-            functionProxy.functionCall(tuple);
+            functionProxy.functionCall(tuple, cloneUpdateTb);
+
+            //doing clone update
+            if (cloneUpdateTb.getSize() > 0) {
+                if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                    //release the cursor/latch
+                    cursor.close();
+                    //batch update
+                    updateBuffer.updateBTree(indexAccessor);
+
+                    //search again
+                    cursor.reset();
+                    rangePred.setLowKey(tuple, true);
+                    rangePred.setHighKey(highKey, highKeyInclusive);
+                    indexAccessor.search(cursor, rangePred);
+                }
+            }
+            cloneUpdateTb.reset();
         }
     }
 
@@ -168,6 +192,8 @@
         try {
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
deleted file mode 100644
index 356f06c..0000000
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
-import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
-
-public class FileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-    private final FileSplit[] splits;
-    private final IRuntimeHookFactory preHookFactory;
-    private final IRuntimeHookFactory postHookFactory;
-    private final IRecordDescriptorFactory inputRdFactory;
-
-    public FileWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
-            IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
-            IRuntimeHookFactory postHookFactory) {
-        super(spec, 1, 0);
-        this.splits = fileSplitProvider.getFileSplits();
-        this.preHookFactory = preHookFactory;
-        this.postHookFactory = postHookFactory;
-        this.inputRdFactory = inputRdFactory;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
-        IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-            private RecordDescriptor rd0;
-            private FrameDeserializer frameDeserializer;
-            private PrintWriter outputWriter;
-            private final static String separator = "|";
-
-            @Override
-            public void open() throws HyracksDataException {
-                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
-                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
-                try {
-                    outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
-                            .getLocalFile().getFile())));
-                    if (preHookFactory != null)
-                        preHookFactory.createRuntimeHook().configure(ctx);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
-                frameDeserializer.reset(frame);
-                while (!frameDeserializer.done()) {
-                    Object[] tuple = frameDeserializer.deserializeRecord();
-                    for (int i = 0; i < tuple.length - 1; i++) {
-                        outputWriter.print(StringSerializationUtils.toString(tuple[i]));
-                        outputWriter.print(separator);
-                    }
-                    outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
-                    outputWriter.println();
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                if (postHookFactory != null)
-                    postHookFactory.createRuntimeHook().configure(ctx);
-                outputWriter.close();
-            }
-
-        };
-        return op;
-    }
-}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
             private FrameDeserializer frameDeserializer;
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final IFunction function = functionFactory.createFunction();
+            private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
 
             @Override
-            public void close() throws HyracksDataException {
-                if (postHookFactory != null)
-                    postHookFactory.createRuntimeHook().configure(ctx);
-                function.close();
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 for (IFrameWriter writer : writers) {
-                    writer.close();
+                    writer.open();
                 }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
-                }
+                if (preHookFactory != null)
+                    preHookFactory.createRuntimeHook().configure(ctx);
+                function.open(ctx, rd0, writers);
             }
 
             @Override
@@ -89,17 +88,21 @@
             }
 
             @Override
-            public void open() throws HyracksDataException {
-                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
-                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                function.close();
                 for (IFrameWriter writer : writers) {
-                    writer.open();
+                    writer.close();
                 }
-                if (preHookFactory != null)
-                    preHookFactory.createRuntimeHook().configure(ctx);
-                function.open(ctx, rd0, writers);
+                Thread.currentThread().setContextClassLoader(ctxCL);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
             }
 
             @Override
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 75a8087..37029f3 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.dataflow.std;
 
-import java.io.DataOutput;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -44,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -51,9 +51,6 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
-
     private BTree btree;
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
@@ -67,17 +64,16 @@
     protected ITreeIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
-    private final RecordDescriptor inputRecDesc;
-
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private final UpdateBuffer updateBuffer;
 
     public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
             IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
-        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
         this.lowKeyInclusive = lowKeyInclusive;
@@ -95,6 +91,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -144,12 +141,12 @@
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
-            dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
             indexAccessor = btree.createAccessor();
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
@@ -158,27 +155,29 @@
 
     private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
         while (cursor.hasNext()) {
-            tb.reset();
             cursor.next();
-
             ITupleReference tupleRef = cursor.getTuple();
-            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-                int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-                int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-                int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-                int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-                dos.write(leftAccessor.getBuffer().array(), offset, len);
-                tb.addFieldEndOffset();
-            }
-            for (int i = 0; i < tupleRef.getFieldCount(); i++) {
-                dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
 
             /**
              * call the update function
              */
-            functionProxy.functionCall(tb, tupleRef);
+            functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+
+            if (cloneUpdateTb.getSize() > 0) {
+                if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                    //release the cursor/latch
+                    cursor.close();
+                    //batch update
+                    updateBuffer.updateBTree(indexAccessor);
+
+                    //search again
+                    cursor.reset();
+                    rangePred.setLowKey(tupleRef, true);
+                    rangePred.setHighKey(highKey, highKeyInclusive);
+                    indexAccessor.search(cursor, rangePred);
+                }
+            }
+            cloneUpdateTb.reset();
         }
     }
 
@@ -210,6 +209,8 @@
         try {
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index c31ebd4..f7b3d62 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -45,6 +45,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
         AbstractUnaryInputOperatorNodePushable {
@@ -53,7 +54,7 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
+    private ArrayTupleBuilder nullTupleBuilder;
     private DataOutput dos;
 
     private BTree btree;
@@ -76,6 +77,8 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private UpdateBuffer updateBuffer;
 
     public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -100,6 +103,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -144,8 +148,15 @@
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
-            dos = tb.getDataOutput();
+
+            nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
+            dos = nullTupleBuilder.getDataOutput();
+            nullTupleBuilder.reset();
+            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+                nullWriter[i].writeNull(dos);
+                nullTupleBuilder.addFieldEndOffset();
+            }
+
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -164,32 +175,38 @@
                 match = false;
             }
 
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
         }
     }
 
+    //for the join match casesos
     private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-            dos.write(leftAccessor.getBuffer().array(), offset, len);
-            tb.addFieldEndOffset();
-        }
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
-
         /**
          * function call
          */
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again and recover the cursor
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
@@ -243,6 +260,8 @@
             }
             try {
                 cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
@@ -271,20 +290,27 @@
 
     /** write result for outer case */
     private void writeResults(ITupleReference frameTuple) throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            nullWriter[i].writeNull(dos);
-            tb.addFieldEndOffset();
-        }
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
-
         /**
          * function call
          */
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again and recover the cursor
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 0a966b5..6af60a8 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.dataflow.std;
 
-import java.io.DataOutput;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -44,6 +43,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
 public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -51,8 +51,6 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
 
     private BTree btree;
     private boolean isForward;
@@ -63,8 +61,6 @@
     protected ITreeIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
-    private final RecordDescriptor inputRecDesc;
-
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
 
@@ -73,13 +69,14 @@
 
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
+    private ArrayTupleBuilder cloneUpdateTb;
+    private UpdateBuffer updateBuffer;
 
     public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
             int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
             IRecordDescriptorFactory inputRdFactory, int outputArity) {
-        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
         this.isForward = isForward;
@@ -97,6 +94,7 @@
         this.writers = new IFrameWriter[outputArity];
         this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
                 writers);
+        this.updateBuffer = new UpdateBuffer(ctx, 2);
     }
 
     protected void setCursor() {
@@ -123,8 +121,6 @@
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(btree.getFieldCount());
-            dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -142,7 +138,8 @@
                 currentTopTuple = cursor.getTuple();
                 match = false;
             }
-
+            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
@@ -207,6 +204,9 @@
             }
             try {
                 cursor.close();
+
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
@@ -231,29 +231,47 @@
 
     /** write the right result */
     private void writeRightResults(ITupleReference frameTuple) throws Exception {
-        tb.reset();
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
+        functionProxy.functionCall(frameTuple, cloneUpdateTb);
 
-        functionProxy.functionCall(tb, frameTuple);
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     /** write the left result */
     private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-            dos.write(leftAccessor.getBuffer().array(), offset, len);
-            tb.addFieldEndOffset();
-        }
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
 
-        functionProxy.functionCall(tb, frameTuple);
+        //doing clone update
+        if (cloneUpdateTb.getSize() > 0) {
+            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+                //release the cursor/latch
+                cursor.close();
+                //batch update
+                updateBuffer.updateBTree(indexAccessor);
+
+                //search again
+                cursor.reset();
+                rangePred.setLowKey(frameTuple, true);
+                rangePred.setHighKey(null, true);
+                indexAccessor.search(cursor, rangePred);
+            }
+            cloneUpdateTb.reset();
+        }
     }
 
     @Override
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 4b0f4a5..99bca1a 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.pregelix.dataflow.util;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -36,6 +37,7 @@
     private final IFrameWriter[] writers;
     private TupleDeserializer tupleDe;
     private RecordDescriptor inputRd;
+    private ClassLoader ctxCL;
 
     public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -56,6 +58,7 @@
     public void functionOpen() throws HyracksDataException {
         inputRd = inputRdFactory.createRecordDescriptor();
         tupleDe = new TupleDeserializer(inputRd);
+        ctxCL = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
@@ -68,16 +71,19 @@
     /**
      * Call the function
      * 
-     * @param tb
-     *            input data
+     * @param leftAccessor
+     *            input page accessor
+     * @param leftTupleIndex
+     *            the tuple index in the page
      * @param updateRef
      *            update pointer
      * @throws HyracksDataException
      */
-    public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
-        Object[] tuple = tupleDe.deserializeRecord(tb);
+    public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
+            ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
         function.process(tuple);
-        function.update(updateRef);
+        function.update(right, cloneUpdateTb);
     }
 
     /**
@@ -86,10 +92,26 @@
      * @param updateRef
      * @throws HyracksDataException
      */
-    public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+    public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
         Object[] tuple = tupleDe.deserializeRecord(updateRef);
         function.process(tuple);
-        function.update(updateRef);
+        function.update(updateRef, cloneUpdateTb);
+    }
+
+    /**
+     * Call the function
+     * 
+     * @param tb
+     *            input data
+     * @param inPlaceUpdateRef
+     *            update pointer
+     * @throws HyracksDataException
+     */
+    public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
+            throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
+        function.process(tuple);
+        function.update(inPlaceUpdateRef, cloneUpdateTb);
     }
 
     /**
@@ -104,5 +126,6 @@
         for (IFrameWriter writer : writers) {
             writer.close();
         }
+        Thread.currentThread().setContextClassLoader(ctxCL);
     }
 }
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 5ae1d81..4fe83db 100644
--- a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -20,6 +20,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,7 +43,7 @@
     }
 
     public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
-        for (int i = 0; i < record.length; ++i) {
+        for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
             byte[] data = tupleRef.getFieldData(i);
             int offset = tupleRef.getFieldStart(i);
             bbis.setByteArray(data, offset);
@@ -65,11 +66,65 @@
         return record;
     }
 
-    public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+    public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
+            throws HyracksDataException {
+        byte[] data = left.getBuffer().array();
+        int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+        int leftFieldCount = left.getFieldCount();
+        int fStart = tStart;
+        for (int i = 0; i < leftFieldCount; ++i) {
+            /**
+             * reset the input
+             */
+            fStart = tStart + left.getFieldStartOffset(tIndex, i);
+            bbis.setByteArray(data, fStart);
+
+            /**
+             * do deserialization
+             */
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        for (int i = leftFieldCount; i < record.length; ++i) {
+            byte[] rightData = right.getFieldData(i - leftFieldCount);
+            int rightOffset = right.getFieldStart(i - leftFieldCount);
+            bbis.setByteArray(rightData, rightOffset);
+
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return record;
+    }
+
+    public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
         byte[] data = tb.getByteArray();
         int[] offset = tb.getFieldEndOffsets();
         int start = 0;
-        for (int i = 0; i < record.length; ++i) {
+        for (int i = 0; i < offset.length; ++i) {
             /**
              * reset the input
              */
@@ -94,6 +149,26 @@
                 }
             }
         }
+        for (int i = offset.length; i < record.length; ++i) {
+            byte[] rightData = right.getFieldData(i - offset.length);
+            int rightOffset = right.getFieldStart(i - offset.length);
+            bbis.setByteArray(rightData, rightOffset);
+
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
         return record;
     }
 }
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
new file mode 100644
index 0000000..9a30647
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+/**
+ * The buffer to hold updates.
+ * We do a batch update for the B-tree during index search and join so that
+ * avoid to open/close cursors frequently.
+ */
+public class UpdateBuffer {
+
+    private int currentInUse = 0;
+    private final int pageLimit;
+    private final List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    private final FrameTupleAppender appender;
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleReference tuple = new FrameTupleReference();
+    private final int frameSize;
+    private IFrameTupleAccessor fta;
+
+    public UpdateBuffer(int numPages, IHyracksTaskContext ctx, int fieldCount) {
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        ByteBuffer buffer = ctx.allocateFrame();
+        this.buffers.add(buffer);
+        this.appender.reset(buffer, true);
+        this.pageLimit = numPages;
+        this.ctx = ctx;
+        this.frameSize = ctx.getFrameSize();
+        this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
+    }
+
+    public UpdateBuffer(IHyracksTaskContext ctx, int fieldCount) {
+        //by default, the update buffer has 1000 pages
+        this(1000, ctx, fieldCount);
+    }
+
+    public void setFieldCount(int fieldCount) {
+        if (fta.getFieldCount() != fieldCount) {
+            this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
+        }
+    }
+
+    public boolean appendTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            if (currentInUse + 1 < pageLimit) {
+                // move to the new buffer
+                currentInUse++;
+                allocate(currentInUse);
+                ByteBuffer buffer = buffers.get(currentInUse);
+                appender.reset(buffer, true);
+
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new HyracksDataException("tuple cannot be appended to a new frame!");
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            return true;
+        }
+    }
+
+    public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+        // batch update
+        for (int i = 0; i <= currentInUse; i++) {
+            ByteBuffer buffer = buffers.get(i);
+            fta.reset(buffer);
+            for (int j = 0; j < fta.getTupleCount(); j++) {
+                tuple.reset(fta, j);
+                bta.update(tuple);
+            }
+        }
+
+        //cleanup the buffer
+        currentInUse = 0;
+        ByteBuffer buffer = buffers.get(0);
+        appender.reset(buffer, true);
+    }
+
+    private void allocate(int index) {
+        if (index >= buffers.size()) {
+            buffers.add(ctx.allocateFrame());
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
new file mode 100644
index 0000000..39f1361
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public final class UpdateBufferTupleAccessor implements IFrameTupleAccessor {
+    private final int frameSize;
+    private final int fieldCount;
+    private ByteBuffer buffer;
+
+    public UpdateBufferTupleAccessor(int frameSize, int fieldCount) {
+        this.frameSize = frameSize;
+        this.fieldCount = fieldCount;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fieldCount;
+    }
+}
\ No newline at end of file