Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-dataflow-std/pom.xml b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
new file mode 100644
index 0000000..8e8824b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
@@ -0,0 +1,148 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>pregelix-dataflow-std</artifactId>
+	<packaging>jar</packaging>
+	<name>pregelix-dataflow-std</name>
+
+	<parent>
+    		<groupId>edu.uci.ics.hyracks</groupId>
+    		<artifactId>pregelix</artifactId>
+    		<version>0.2.2-SNAPSHOT</version>
+  	</parent>
+
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx512m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>pregelix-dataflow-std-base</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-std</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-data-std</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-btree</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-ipc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
new file mode 100644
index 0000000..99e55f1
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class BTreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected boolean isForward;
+    protected int[] lowKeyFields; // fields in input tuple to be used as low
+                                  // keys
+    protected int[] highKeyFields; // fields in input tuple to be used as high
+    // keys
+    protected boolean lowKeyInclusive;
+    protected boolean highKeyInclusive;
+
+    private final IUpdateFunctionFactory functionFactory;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    private final int outputArity;
+
+    public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
+            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+        super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.functionFactory = functionFactory;
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+
+        for (int i = 0; i < rDescs.length; i++) {
+            this.recordDescriptors[i] = rDescs[i];
+        }
+
+        this.outputArity = outputArity;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new BTreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory, preHookFactory,
+                postHookFactory, inputRdFactory, outputArity);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..fb84aa0
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+    protected TreeIndexDataflowHelper treeIndexHelper;
+    protected FrameTupleAccessor accessor;
+
+    protected ByteBuffer writeBuffer;
+    protected FrameTupleAppender appender;
+    protected ArrayTupleBuilder tb;
+    protected DataOutput dos;
+
+    protected BTree btree;
+    protected boolean isForward;
+    protected PermutingFrameTupleReference lowKey;
+    protected PermutingFrameTupleReference highKey;
+    protected boolean lowKeyInclusive;
+    protected boolean highKeyInclusive;
+    protected RangePredicate rangePred;
+    protected MultiComparator lowKeySearchCmp;
+    protected MultiComparator highKeySearchCmp;
+    protected ITreeIndexCursor cursor;
+    protected ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    protected RecordDescriptor recDesc;
+
+    private final IFrameWriter[] writers;
+    private final FunctionProxy functionProxy;
+
+    public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+            IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
+        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.isForward = isForward;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+
+        this.writers = new IFrameWriter[outputArity];
+        this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+                writers);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        /**
+         * open the function
+         */
+        functionProxy.functionOpen();
+        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexHelper.init(false);
+            btree = (BTree) treeIndexHelper.getIndex();
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+
+            // Construct range predicate.
+            lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), lowKey);
+            highKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), highKey);
+            rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+                    highKeySearchCmp);
+
+            writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+            indexAccessor = btree.createAccessor();
+        } catch (Exception e) {
+            treeIndexHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+    }
+
+    protected void writeSearchResults() throws Exception {
+        while (cursor.hasNext()) {
+            cursor.next();
+            ITupleReference tuple = cursor.getTuple();
+            functionProxy.functionCall(tuple);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                if (lowKey != null) {
+                    lowKey.reset(accessor, i);
+                }
+                if (highKey != null) {
+                    highKey.reset(accessor, i);
+                }
+                rangePred.setLowKey(lowKey, lowKeyInclusive);
+                rangePred.setHighKey(highKey, highKeyInclusive);
+                cursor.reset();
+                indexAccessor.search(cursor, rangePred);
+                writeSearchResults();
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+
+            /**
+             * close the update function
+             */
+            functionProxy.functionClose();
+        } finally {
+            treeIndexHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        for (IFrameWriter writer : writers)
+            writer.fail();
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        writers[index] = writer;
+    }
+
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..356f06c
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class FileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final FileSplit[] splits;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public FileWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+            IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory) {
+        super(spec, 1, 0);
+        this.splits = fileSplitProvider.getFileSplits();
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private PrintWriter outputWriter;
+            private final static String separator = "|";
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                try {
+                    outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
+                            .getLocalFile().getFile())));
+                    if (preHookFactory != null)
+                        preHookFactory.createRuntimeHook().configure(ctx);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                while (!frameDeserializer.done()) {
+                    Object[] tuple = frameDeserializer.deserializeRecord();
+                    for (int i = 0; i < tuple.length - 1; i++) {
+                        outputWriter.print(StringSerializationUtils.toString(tuple[i]));
+                        outputWriter.print(separator);
+                    }
+                    outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
+                    outputWriter.println();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                outputWriter.close();
+            }
+
+        };
+        return op;
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
new file mode 100644
index 0000000..ee3ac82
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.std.base.IFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class FunctionCallOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final IFunctionFactory functionFactory;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public FunctionCallOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+            int outputArity, IFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+        super(spec, 1, outputArity);
+        this.functionFactory = functionFactory;
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+
+        for (int i = 0; i < rDescs.length; i++) {
+            this.recordDescriptors[i] = rDescs[i];
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+            private final IFunction function = functionFactory.createFunction();
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                function.close();
+                for (IFrameWriter writer : writers) {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                while (!frameDeserializer.done()) {
+                    Object[] tuple = frameDeserializer.deserializeRecord();
+                    function.process(tuple);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+                if (preHookFactory != null)
+                    preHookFactory.createRuntimeHook().configure(ctx);
+                function.open(ctx, rd0, writers);
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
new file mode 100644
index 0000000..60559e8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class IndexNestedLoopJoinFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private boolean isForward;
+    private int[] lowKeyFields; // fields in input tuple to be used as low keys
+    private int[] highKeyFields; // fields in input tuple to be used as high
+    // keys
+    private boolean lowKeyInclusive;
+    private boolean highKeyInclusive;
+
+    // right outer join
+    private boolean isRightOuter = false;
+    private INullWriterFactory[] nullWriterFactories;
+
+    // set union
+    private boolean isSetUnion = false;
+
+    private final IUpdateFunctionFactory functionFactory;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    private final int outputArity;
+
+    public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+            IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
+            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.functionFactory = functionFactory;
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+
+        for (int i = 0; i < rDescs.length; i++) {
+            this.recordDescriptors[i] = rDescs[i];
+        }
+
+        this.outputArity = outputArity;
+    }
+
+    public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+            boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
+            int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.isRightOuter = isRightOuter;
+        this.nullWriterFactories = nullWriterFactories;
+
+        this.functionFactory = functionFactory;
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+
+        for (int i = 0; i < rDescs.length; i++) {
+            this.recordDescriptors[i] = rDescs[i];
+        }
+
+        this.outputArity = outputArity;
+    }
+
+    public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+            boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
+            IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.isSetUnion = isSetUnion;
+
+        this.functionFactory = functionFactory;
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+
+        for (int i = 0; i < rDescs.length; i++) {
+            this.recordDescriptors[i] = rDescs[i];
+        }
+
+        this.outputArity = outputArity;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        if (isRightOuter) {
+            INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+            for (int i = 0; i < nullWriters.length; i++)
+                nullWriters[i] = nullWriterFactories[i].createNullWriter();
+            return new IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(this, ctx, partition,
+                    recordDescProvider, isForward, lowKeyFields, highKeyFields, nullWriters, functionFactory,
+                    preHookFactory, postHookFactory, inputRdFactory, outputArity);
+        } else if (isSetUnion) {
+            return new IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(this, ctx, partition,
+                    recordDescProvider, isForward, lowKeyFields, highKeyFields, functionFactory, preHookFactory,
+                    postHookFactory, inputRdFactory, outputArity);
+        } else {
+            return new IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider,
+                    isForward, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory,
+                    preHookFactory, postHookFactory, inputRdFactory, outputArity);
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..75a8087
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+    private boolean lowKeyInclusive;
+    private boolean highKeyInclusive;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private MultiComparator highKeySearchCmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    private final IFrameWriter[] writers;
+    private final FunctionProxy functionProxy;
+
+    public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+            IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+
+        this.writers = new IFrameWriter[outputArity];
+        this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+                writers);
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        /**
+         * open the function
+         */
+        functionProxy.functionOpen();
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            btree.open(treeIndexOpHelper.getIndexFileId());
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+
+            // TODO: Can we construct the multicmps using helper methods?
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int highKeySearchFields = btree.getComparatorFactories().length;
+            if (lowKey != null)
+                lowKeySearchFields = lowKey.getFieldCount();
+            if (highKey != null)
+                highKeySearchFields = highKey.getFieldCount();
+
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            if (lowKeySearchFields == highKeySearchFields) {
+                highKeySearchCmp = lowKeySearchCmp;
+            } else {
+                IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+                for (int i = 0; i < highKeySearchFields; i++) {
+                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                }
+                highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+
+            }
+
+            rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+                    highKeySearchCmp);
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            indexAccessor = btree.createAccessor();
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+        while (cursor.hasNext()) {
+            tb.reset();
+            cursor.next();
+
+            ITupleReference tupleRef = cursor.getTuple();
+            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+                int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+                int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+                int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+                int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+                dos.write(leftAccessor.getBuffer().array(), offset, len);
+                tb.addFieldEndOffset();
+            }
+            for (int i = 0; i < tupleRef.getFieldCount(); i++) {
+                dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
+
+            /**
+             * call the update function
+             */
+            functionProxy.functionCall(tb, tupleRef);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                rangePred.setLowKey(lowKey, lowKeyInclusive);
+                rangePred.setHighKey(highKey, highKeyInclusive);
+
+                cursor.reset();
+                indexAccessor.search(cursor, rangePred);
+                writeSearchResults(accessor, i);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+
+            /**
+             * close the update function
+             */
+            functionProxy.functionClose();
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        for (IFrameWriter writer : writers)
+            writer.fail();
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        writers[index] = writer;
+    }
+
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..ed177e3
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean isForward;
+    private int[] lowKeyFields; // fields in input tuple to be used as low keys
+    private int[] highKeyFields; // fields in input tuple to be used as high
+    // keys
+    private boolean lowKeyInclusive;
+    private boolean highKeyInclusive;
+
+    // right outer join
+    private boolean isRightOuter = false;
+    private INullWriterFactory[] nullWriterFactories;
+
+    // set union
+    private boolean isSetUnion = false;
+
+    public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
+        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+    }
+
+    public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+            boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
+        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.isRightOuter = isRightOuter;
+        this.nullWriterFactories = nullWriterFactories;
+    }
+
+    public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+            boolean isSetUnion) {
+        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.isForward = isForward;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+
+        this.isSetUnion = isSetUnion;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        if (isRightOuter) {
+            INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+            for (int i = 0; i < nullWriters.length; i++)
+                nullWriters[i] = nullWriterFactories[i].createNullWriter();
+            return new IndexNestedLoopRightOuterJoinOperatorNodePushable(this, ctx, partition, recordDescProvider,
+                    isForward, lowKeyFields, highKeyFields, nullWriters);
+        } else if (isSetUnion) {
+            return new IndexNestedLoopSetUnionOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+                    lowKeyFields, highKeyFields);
+        } else {
+            return new IndexNestedLoopJoinOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+                    lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
new file mode 100644
index 0000000..bd076d3
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+    private boolean lowKeyInclusive;
+    private boolean highKeyInclusive;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private MultiComparator highKeySearchCmp;
+    private IIndexCursor cursor;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    public IndexNestedLoopJoinOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields,
+            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.lowKeyInclusive = lowKeyInclusive;
+        this.highKeyInclusive = highKeyInclusive;
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+    }
+
+    protected void setCursor() {
+        cursor = indexAccessor.createSearchCursor();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            writer.open();
+
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int highKeySearchFields = btree.getComparatorFactories().length;
+            if (lowKey != null)
+                lowKeySearchFields = lowKey.getFieldCount();
+            if (highKey != null)
+                highKeySearchFields = highKey.getFieldCount();
+
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            if (lowKeySearchFields == highKeySearchFields) {
+                highKeySearchCmp = lowKeySearchCmp;
+            } else {
+                IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+                for (int i = 0; i < highKeySearchFields; i++) {
+                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                }
+                highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+            }
+
+            rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+                    highKeySearchCmp);
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+            indexAccessor = btree.createAccessor();
+            setCursor();
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+        while (cursor.hasNext()) {
+            tb.reset();
+            cursor.next();
+
+            ITupleReference frameTuple = cursor.getTuple();
+            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+                int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+                int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+                int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+                int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+                dos.write(leftAccessor.getBuffer().array(), offset, len);
+                tb.addFieldEndOffset();
+            }
+            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
+
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+                appender.reset(writeBuffer, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                rangePred.setLowKey(lowKey, lowKeyInclusive);
+                rangePred.setHighKey(highKey, highKeyInclusive);
+
+                cursor.reset();
+                indexAccessor.search(cursor, rangePred);
+                writeSearchResults(accessor, i);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..c31ebd4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
+        AbstractUnaryInputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private MultiComparator highKeySearchCmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+
+    private INullWriter[] nullWriter;
+    private ITupleReference currentTopTuple;
+    private boolean match;
+
+    private final IFrameWriter[] writers;
+    private final FunctionProxy functionProxy;
+
+    public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields, INullWriter[] nullWriter, IUpdateFunctionFactory functionFactory,
+            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+            IRecordDescriptorFactory inputRdFactory, int outputArity) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+        this.nullWriter = nullWriter;
+
+        this.writers = new IFrameWriter[outputArity];
+        this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+                writers);
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        /**
+         * function open
+         */
+        functionProxy.functionOpen();
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+
+            // construct range predicate
+            // TODO: Can we construct the multicmps using helper methods?
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int highKeySearchFields = btree.getComparatorFactories().length;
+
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            if (lowKeySearchFields == highKeySearchFields) {
+                highKeySearchCmp = lowKeySearchCmp;
+            } else {
+                IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+                for (int i = 0; i < highKeySearchFields; i++) {
+                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                }
+                highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+            }
+
+            rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
+
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            indexAccessor = btree.createAccessor();
+
+            /** set the search cursor */
+            rangePred.setLowKey(null, true);
+            rangePred.setHighKey(null, true);
+            cursor.reset();
+            indexAccessor.search(cursor, rangePred);
+
+            /** set up current top tuple */
+            if (cursor.hasNext()) {
+                cursor.next();
+                currentTopTuple = cursor.getTuple();
+                match = false;
+            }
+
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+            throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+            dos.write(leftAccessor.getBuffer().array(), offset, len);
+            tb.addFieldEndOffset();
+        }
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        /**
+         * function call
+         */
+        functionProxy.functionCall(tb, frameTuple);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount && currentTopTuple != null;) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                // TODO: currently use low key only, check what they mean
+                int cmp = compare(lowKey, currentTopTuple);
+                if (cmp <= 0) {
+                    if (cmp == 0)
+                        outputMatch(i);
+                    i++;
+                } else {
+                    moveTreeCursor();
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void outputMatch(int i) throws Exception {
+        writeResults(accessor, i, currentTopTuple);
+        match = true;
+    }
+
+    private void moveTreeCursor() throws Exception {
+        if (!match) {
+            writeResults(currentTopTuple);
+        }
+        if (cursor.hasNext()) {
+            cursor.next();
+            currentTopTuple = cursor.getTuple();
+            match = false;
+        } else {
+            currentTopTuple = null;
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            while (currentTopTuple != null) {
+                moveTreeCursor();
+            }
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+
+            /**
+             * function close
+             */
+            functionProxy.functionClose();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        for (IFrameWriter writer : writers)
+            writer.fail();
+    }
+
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
+    /** write result for outer case */
+    private void writeResults(ITupleReference frameTuple) throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            nullWriter[i].writeNull(dos);
+            tb.addFieldEndOffset();
+        }
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        /**
+         * function call
+         */
+        functionProxy.functionCall(tb, frameTuple);
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        writers[index] = writer;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
new file mode 100644
index 0000000..9f1e1ad
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
+        AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private boolean isForward;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private MultiComparator highKeySearchCmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+
+    private INullWriter[] nullWriter;
+    private ITupleReference currentTopTuple;
+    private boolean match;
+
+    public IndexNestedLoopRightOuterJoinOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields, INullWriter[] nullWriter) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.isForward = isForward;
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+        this.nullWriter = nullWriter;
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+            writer.open();
+
+            // construct range predicate
+            // TODO: Can we construct the multicmps using helper methods?
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int highKeySearchFields = btree.getComparatorFactories().length;
+
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            if (lowKeySearchFields == highKeySearchFields) {
+                highKeySearchCmp = lowKeySearchCmp;
+            } else {
+                IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+                for (int i = 0; i < highKeySearchFields; i++) {
+                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                }
+                highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+
+            }
+
+            rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
+
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            indexAccessor = btree.createAccessor();
+
+            /** set the search cursor */
+            rangePred.setLowKey(null, true);
+            rangePred.setHighKey(null, true);
+            cursor.reset();
+            indexAccessor.search(cursor, rangePred);
+
+            /** set up current top tuple */
+            if (cursor.hasNext()) {
+                cursor.next();
+                currentTopTuple = cursor.getTuple();
+                match = false;
+            }
+
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+            throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+            dos.write(leftAccessor.getBuffer().array(), offset, len);
+            tb.addFieldEndOffset();
+        }
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+            appender.reset(writeBuffer, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount && currentTopTuple != null;) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                // TODO: currently use low key only, check what they mean
+                int cmp = compare(lowKey, currentTopTuple);
+                if ((cmp <= 0 && isForward) || (cmp >= 0 && !isForward)) {
+                    if (cmp == 0)
+                        outputMatch(i);
+                    i++;
+                } else {
+                    moveTreeCursor();
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void outputMatch(int i) throws Exception {
+        writeResults(accessor, i, currentTopTuple);
+        match = true;
+    }
+
+    private void moveTreeCursor() throws Exception {
+        if (!match) {
+            writeResults(currentTopTuple);
+        }
+        if (cursor.hasNext()) {
+            cursor.next();
+            currentTopTuple = cursor.getTuple();
+            match = false;
+        } else {
+            currentTopTuple = null;
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            while (currentTopTuple != null) {
+                moveTreeCursor();
+            }
+
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
+    /** write result for outer case */
+    private void writeResults(ITupleReference frameTuple) throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            nullWriter[i].writeNull(dos);
+            tb.addFieldEndOffset();
+        }
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+            appender.reset(writeBuffer, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..0a966b5
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private boolean isForward;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+
+    private ITupleReference currentTopTuple;
+    private boolean match;
+
+    private final IFrameWriter[] writers;
+    private final FunctionProxy functionProxy;
+
+    public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
+            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+            IRecordDescriptorFactory inputRdFactory, int outputArity) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.isForward = isForward;
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+
+        this.writers = new IFrameWriter[outputArity];
+        this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+                writers);
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        functionProxy.functionOpen();
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+
+            rangePred = new RangePredicate(null, null, true, true, null, null);
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            indexAccessor = btree.createAccessor();
+
+            /** set the search cursor */
+            rangePred.setLowKey(null, true);
+            rangePred.setHighKey(null, true);
+            cursor.reset();
+            indexAccessor.search(cursor, rangePred);
+
+            /** set up current top tuple */
+            if (cursor.hasNext()) {
+                cursor.next();
+                currentTopTuple = cursor.getTuple();
+                match = false;
+            }
+
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount;) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                // TODO: currently use low key only, check what they mean
+                if (currentTopTuple != null) {
+                    int cmp = compare(lowKey, currentTopTuple);
+                    if (cmp == 0) {
+                        outputMatch(i);
+                        i++;
+                    } else if ((cmp > 0 && isForward) || (cmp < 0 && !isForward)) {
+                        moveTreeCursor();
+                    } else {
+                        writeLeftResults(accessor, i, null);
+                        i++;
+                    }
+                } else {
+                    writeLeftResults(accessor, i, null);
+                    i++;
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void outputMatch(int i) throws Exception {
+        writeLeftResults(accessor, i, currentTopTuple);
+        match = true;
+    }
+
+    private void moveTreeCursor() throws Exception {
+        if (!match) {
+            writeRightResults(currentTopTuple);
+        }
+        if (cursor.hasNext()) {
+            cursor.next();
+            currentTopTuple = cursor.getTuple();
+            match = false;
+        } else {
+            currentTopTuple = null;
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            while (currentTopTuple != null) {
+                moveTreeCursor();
+            }
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            functionProxy.functionClose();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        for (IFrameWriter writer : writers)
+            writer.fail();
+    }
+
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
+    /** write the right result */
+    private void writeRightResults(ITupleReference frameTuple) throws Exception {
+        tb.reset();
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        functionProxy.functionCall(tb, frameTuple);
+    }
+
+    /** write the left result */
+    private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+            throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+            dos.write(leftAccessor.getBuffer().array(), offset, len);
+            tb.addFieldEndOffset();
+        }
+
+        functionProxy.functionCall(tb, frameTuple);
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        writers[index] = writer;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
new file mode 100644
index 0000000..615a25b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private BTree btree;
+    private RangePredicate rangePred;
+    private MultiComparator lowKeySearchCmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame cursorFrame;
+    protected ITreeIndexAccessor indexAccessor;
+
+    private RecordDescriptor recDesc;
+    private final RecordDescriptor inputRecDesc;
+
+    private PermutingFrameTupleReference lowKey;
+    private PermutingFrameTupleReference highKey;
+
+    private ITupleReference currentTopTuple;
+    private boolean match;
+
+    public IndexNestedLoopSetUnionOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+            int[] lowKeyFields, int[] highKeyFields) {
+        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+        if (lowKeyFields != null && lowKeyFields.length > 0) {
+            lowKey = new PermutingFrameTupleReference();
+            lowKey.setFieldPermutation(lowKeyFields);
+        }
+        if (highKeyFields != null && highKeyFields.length > 0) {
+            highKey = new PermutingFrameTupleReference();
+            highKey.setFieldPermutation(highKeyFields);
+        }
+    }
+
+    protected void setCursor() {
+        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+        try {
+            treeIndexOpHelper.init(false);
+            btree = (BTree) treeIndexOpHelper.getIndex();
+            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            setCursor();
+            writer.open();
+
+            rangePred = new RangePredicate(null, null, true, true, null, null);
+            int lowKeySearchFields = btree.getComparatorFactories().length;
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            tb = new ArrayTupleBuilder(btree.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            indexAccessor = btree.createAccessor();
+
+            /** set the search cursor */
+            rangePred.setLowKey(null, true);
+            rangePred.setHighKey(null, true);
+            cursor.reset();
+            indexAccessor.search(cursor, rangePred);
+
+            /** set up current top tuple */
+            if (cursor.hasNext()) {
+                cursor.next();
+                currentTopTuple = cursor.getTuple();
+                match = false;
+            }
+
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount;) {
+                if (lowKey != null)
+                    lowKey.reset(accessor, i);
+                if (highKey != null)
+                    highKey.reset(accessor, i);
+                // TODO: currently use low key only, check what they mean
+                if (currentTopTuple != null) {
+                    int cmp = compare(lowKey, currentTopTuple);
+                    if (cmp == 0) {
+                        outputMatch(i);
+                        i++;
+                    } else if ((cmp > 0)) {
+                        moveTreeCursor();
+                    } else {
+                        writeLeftResults(accessor, i);
+                        i++;
+                    }
+                } else {
+                    writeLeftResults(accessor, i);
+                    i++;
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void outputMatch(int i) throws Exception {
+        writeLeftResults(accessor, i);
+        match = true;
+    }
+
+    private void moveTreeCursor() throws Exception {
+        if (!match) {
+            writeRightResults(currentTopTuple);
+        }
+        if (cursor.hasNext()) {
+            cursor.next();
+            currentTopTuple = cursor.getTuple();
+            match = false;
+        } else {
+            currentTopTuple = null;
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            while (currentTopTuple != null) {
+                moveTreeCursor();
+            }
+
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    /** compare tuples */
+    private int compare(ITupleReference left, ITupleReference right) throws Exception {
+        return lowKeySearchCmp.compare(left, right);
+    }
+
+    /** write the right result */
+    private void writeRightResults(ITupleReference frameTuple) throws Exception {
+        tb.reset();
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+            appender.reset(writeBuffer, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    /** write the left result */
+    private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+        tb.reset();
+        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+            dos.write(leftAccessor.getBuffer().array(), offset, len);
+            tb.addFieldEndOffset();
+        }
+
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+            appender.reset(writeBuffer, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java
new file mode 100644
index 0000000..9f35e25
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class ProjectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final int[] projectFields;
+
+    public ProjectOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int projectFields[]) {
+        super(spec, 1, 1);
+        this.recordDescriptors[0] = rDesc;
+        this.projectFields = projectFields;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            private final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+            private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+            private final ArrayTupleBuilder tb = new ArrayTupleBuilder(projectFields.length);
+            private final DataOutput dos = tb.getDataOutput();
+            private final ByteBuffer writeBuffer = ctx.allocateFrame();
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (appender.getTupleCount() > 0)
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                writer.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                fta.reset(frame);
+                int tupleCount = fta.getTupleCount();
+                try {
+                    for (int tIndex = 0; tIndex < tupleCount; tIndex++) {
+                        tb.reset();
+                        for (int j = 0; j < projectFields.length; j++) {
+                            int fIndex = projectFields[j];
+                            int tupleStart = fta.getTupleStartOffset(tIndex);
+                            int fieldStart = fta.getFieldStartOffset(tIndex, fIndex);
+                            int offset = fta.getFieldSlotsLength() + tupleStart + fieldStart;
+                            int len = fta.getFieldEndOffset(tIndex, fIndex) - fieldStart;
+                            dos.write(fta.getBuffer().array(), offset, len);
+                            tb.addFieldEndOffset();
+                        }
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            FrameUtils.flushFrame(writeBuffer, writer);
+                            appender.reset(writeBuffer, true);
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                throw new IllegalStateException();
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+                appender.reset(writeBuffer, true);
+            }
+
+        };
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java
new file mode 100644
index 0000000..20f7aaa
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class RuntimeHookOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final IRuntimeHookFactory hookFactory;
+
+    public RuntimeHookOperatorDescriptor(JobSpecification spec, IRuntimeHookFactory hookFactory) {
+        super(spec, 1, 1);
+        this.hookFactory = hookFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            private IRuntimeHook hook = hookFactory.createRuntimeHook();
+
+            @Override
+            public void open() throws HyracksDataException {
+                hook.configure(ctx);
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                FrameUtils.flushFrame(buffer, writer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+            }
+
+        };
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
new file mode 100644
index 0000000..126fcb8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] fieldPermutation;
+    private final IStorageManagerInterface storageManager;
+    private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+    private final IFileSplitProvider fileSplitProvider;
+    private final float fillFactor;
+
+    public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
+            IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+            ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
+            float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
+        super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        this.fieldPermutation = fieldPermutation;
+
+        this.storageManager = storageManager;
+        this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+        this.fileSplitProvider = fileSplitProvider;
+        this.fillFactor = fillFactor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
+                recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
new file mode 100644
index 0000000..883fef4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+    private final TreeIndexDataflowHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+    private IIndexBulkLoadContext bulkLoadCtx;
+
+    private IRecordDescriptorProvider recordDescProvider;
+    private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
+    private final IStorageManagerInterface storageManager;
+    private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+    private final IFileSplitProvider fileSplitProvider;
+    private final int partition;
+    private final float fillFactor;
+    private IHyracksTaskContext ctx;
+    private ITreeIndex index;
+
+    public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider) {
+        treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+                opDesc, ctx, partition);
+        this.recordDescProvider = recordDescProvider;
+        tuple.setFieldPermutation(fieldPermutation);
+
+        this.storageManager = storageManager;
+        this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+        this.fileSplitProvider = fileSplitProvider;
+        this.partition = partition;
+        this.ctx = ctx;
+        this.fillFactor = fillFactor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        initDrop();
+        init();
+    }
+
+    private void initDrop() throws HyracksDataException {
+        try {
+            IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
+            IBufferCache bufferCache = storageManager.getBufferCache(ctx);
+            IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
+
+            FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
+            int indexFileId = -1;
+            boolean fileIsMapped = false;
+            synchronized (fileMapProvider) {
+                fileIsMapped = fileMapProvider.isMapped(f);
+                if (fileIsMapped)
+                    indexFileId = fileMapProvider.lookupFileId(f);
+            }
+
+            /**
+             * delete the file if it is mapped
+             */
+            if (fileIsMapped) {
+                // Unregister tree instance.
+                synchronized (treeIndexRegistry) {
+                    treeIndexRegistry.unregister(indexFileId);
+                }
+
+                // remove name to id mapping
+                bufferCache.deleteFile(indexFileId, false);
+            }
+        }
+        // TODO: for the time being we don't throw,
+        // with proper exception handling (no hanging job problem) we should
+        // throw
+        catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void init() throws HyracksDataException {
+        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
+                .getOperatorDescriptor();
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        try {
+            treeIndexOpHelper.init(true);
+            treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
+            index = (ITreeIndex) treeIndexOpHelper.getIndex();
+            index.open(treeIndexOpHelper.getIndexFileId());
+            bulkLoadCtx = index.beginBulkLoad(fillFactor);
+        } catch (Exception e) {
+            // cleanup in case of failure
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            tuple.reset(accessor, i);
+            index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            index.endBulkLoad(bulkLoadCtx);
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
new file mode 100644
index 0000000..4b0f4a5
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class FunctionProxy {
+
+    private final IUpdateFunction function;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+    private final IHyracksTaskContext ctx;
+    private final IFrameWriter[] writers;
+    private TupleDeserializer tupleDe;
+    private RecordDescriptor inputRd;
+
+    public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
+            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+            IRecordDescriptorFactory inputRdFactory, IFrameWriter[] writers) {
+        this.function = functionFactory.createFunction();
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+        this.writers = writers;
+        this.ctx = ctx;
+    }
+
+    /**
+     * Initialize the function
+     * 
+     * @throws HyracksDataException
+     */
+    public void functionOpen() throws HyracksDataException {
+        inputRd = inputRdFactory.createRecordDescriptor();
+        tupleDe = new TupleDeserializer(inputRd);
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+        for (IFrameWriter writer : writers) {
+            writer.open();
+        }
+        if (preHookFactory != null)
+            preHookFactory.createRuntimeHook().configure(ctx);
+        function.open(ctx, inputRd, writers);
+    }
+
+    /**
+     * Call the function
+     * 
+     * @param tb
+     *            input data
+     * @param updateRef
+     *            update pointer
+     * @throws HyracksDataException
+     */
+    public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(tb);
+        function.process(tuple);
+        function.update(updateRef);
+    }
+
+    /**
+     * call function, without the newly generated tuple, just the tuple in btree
+     * 
+     * @param updateRef
+     * @throws HyracksDataException
+     */
+    public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(updateRef);
+        function.process(tuple);
+        function.update(updateRef);
+    }
+
+    /**
+     * Close the function
+     * 
+     * @throws HyracksDataException
+     */
+    public void functionClose() throws HyracksDataException {
+        if (postHookFactory != null)
+            postHookFactory.createRuntimeHook().configure(ctx);
+        function.close();
+        for (IFrameWriter writer : writers) {
+            writer.close();
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
new file mode 100644
index 0000000..b6fd98a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayInputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public int read() {
+        int remaining = data.length - position;
+        int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+        }
+        return value;
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) {
+        int remaining = data.length - position;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+                    + length + " position: " + position);
+        }
+        if (remaining == 0) {
+            return -1;
+        }
+        int l = Math.min(length, remaining);
+        System.arraycopy(data, position, bytes, offset, l);
+        position += l;
+        return l;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java
new file mode 100644
index 0000000..88521a4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayOutputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public void write(int b) {
+        int remaining = data.length - position;
+        if (position + 1 > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        data[position] = (byte) b;
+        position++;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+                    + position);
+        }
+        if (position + length > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        System.arraycopy(bytes, offset, data, position, length);
+        position += length;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
new file mode 100644
index 0000000..5ae1d81
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TupleDeserializer {
+    private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
+
+    private Object[] record;
+    private RecordDescriptor recordDescriptor;
+    private ResetableByteArrayInputStream bbis;
+    private DataInputStream di;
+
+    public TupleDeserializer(RecordDescriptor recordDescriptor) {
+        this.recordDescriptor = recordDescriptor;
+        this.bbis = new ResetableByteArrayInputStream();
+        this.di = new DataInputStream(bbis);
+        this.record = new Object[recordDescriptor.getFields().length];
+    }
+
+    public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
+        for (int i = 0; i < record.length; ++i) {
+            byte[] data = tupleRef.getFieldData(i);
+            int offset = tupleRef.getFieldStart(i);
+            bbis.setByteArray(data, offset);
+
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return record;
+    }
+
+    public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+        byte[] data = tb.getByteArray();
+        int[] offset = tb.getFieldEndOffsets();
+        int start = 0;
+        for (int i = 0; i < record.length; ++i) {
+            /**
+             * reset the input
+             */
+            bbis.setByteArray(data, start);
+            start = offset[i];
+
+            /**
+             * do deserialization
+             */
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return record;
+    }
+}