Refactored IOperatorNodePushable to be extended for multiple inputs

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@111 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
index f019b13..5487832 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -16,7 +16,16 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public interface IOperatorNodePushable extends IFrameWriter {
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+public interface IOperatorNodePushable {
+    public void initialize() throws HyracksDataException;
+
+    public void deinitialize() throws HyracksDataException;
+
+    public int getInputArity();
+
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    public IFrameWriter getInputFrameWriter(int index);
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 39d98b1..12670e7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -34,7 +34,7 @@
     }
 
     public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-        opNode.setFrameWriter(index, writer, recordDesc);
+        opNode.setOutputFrameWriter(index, writer, recordDesc);
     }
 
     public void setFrameReader(IFrameReader reader) {
@@ -48,20 +48,23 @@
     @Override
     public void run() {
         try {
-            opNode.open();
+            opNode.initialize();
             if (reader != null) {
+                IFrameWriter writer = opNode.getInputFrameWriter(0);
+                writer.open();
                 reader.open();
                 while (reader.nextFrame(buffer)) {
                     if (abort) {
                         break;
                     }
                     buffer.flip();
-                    opNode.nextFrame(buffer);
+                    writer.nextFrame(buffer);
                     buffer.compact();
                 }
                 reader.close();
+                writer.close();
             }
-            opNode.close();
+            opNode.deinitialize();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index d9c9fb2..ff45eba 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -33,7 +33,7 @@
 
     @Override
     public int getFieldStart(int fIdx) {
-    	return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tIndex, fIdx);
+        return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tIndex, fIdx);
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java
new file mode 100644
index 0000000..182fa2a
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractUnaryInputOperatorNodePushable implements IOperatorNodePushable, IFrameWriter {
+    protected IFrameWriter writer;
+    protected RecordDescriptor recordDesc;
+
+    @Override
+    public final IFrameWriter getInputFrameWriter(int index) {
+        return this;
+    }
+
+    @Override
+    public final int getInputArity() {
+        return 1;
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
new file mode 100644
index 0000000..12e8cb4
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractUnaryInputSinkOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+    @Override
+    public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        throw new IllegalStateException();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
new file mode 100644
index 0000000..e3767b3
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public abstract class AbstractUnaryInputUnaryOutputOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable
+        implements IFrameWriter {
+    @Override
+    public final IFrameWriter getInputFrameWriter(int index) {
+        return this;
+    }
+
+    @Override
+    public final int getInputArity() {
+        return 1;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
index cfe1b65..8b97497e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
@@ -17,14 +17,26 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractUnaryOutputOperatorNodePushable implements IOperatorNodePushable {
     protected IFrameWriter writer;
     protected RecordDescriptor recordDesc;
 
     @Override
-    public final void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        if (index != 0) {
+            throw new IllegalStateException();
+        }
         this.writer = writer;
         this.recordDesc = recordDesc;
     }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
index f5180f8..db09e57 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -14,17 +14,16 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.base;
 
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 
 public abstract class AbstractUnaryOutputSourceOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
     @Override
-    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        throw new UnsupportedOperationException();
+    public IFrameWriter getInputFrameWriter(int index) {
+        throw new IllegalStateException();
     }
 
     @Override
-    public final void flush() throws HyracksDataException {
+    public final int getInputArity() {
+        return 0;
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index efc63d3..8b63690 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -51,7 +51,7 @@
         final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
-            public void open() throws HyracksDataException {
+            public void initialize() throws HyracksDataException {
                 File f = split.getLocalFile();
                 writer.open();
                 try {
@@ -66,10 +66,6 @@
                     writer.close();
                 }
             }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
         };
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index a568781..09c1c6a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -16,7 +16,6 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -31,6 +30,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -77,7 +77,7 @@
                 final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx,
                     recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
-            return new IOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private GroupingHashTable table;
 
                 @Override
@@ -102,11 +102,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                 }
             };
@@ -126,18 +121,13 @@
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
-                public void open() throws HyracksDataException {
+                public void initialize() throws HyracksDataException {
                     GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
                     writer.open();
                     table.write(writer);
                     writer.close();
                     env.set(HASHTABLE, null);
                 }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    // do nothing
-                }
             };
         }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index c124cb3..221c22f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -42,6 +41,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -119,7 +119,7 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx,
                         recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
 
@@ -133,11 +133,6 @@
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     for (int i = 0; i < numPartitions; i++) {
                         try {
@@ -249,7 +244,7 @@
                 private int[] maxBufferRi;
 
                 @Override
-                public void open() throws HyracksDataException {
+                public void initialize() throws HyracksDataException {
                     channelsR = (FileChannel[]) env.get(SMALLRELATION);
                     channelsS = (FileChannel[]) env.get(LARGERELATION);
                     numPartitions = (Integer) env.get(NUM_PARTITION);
@@ -321,7 +316,7 @@
                 }
 
                 @Override
-                public void close() throws HyracksDataException {
+                public void deinitialize() throws HyracksDataException {
                     env.set(LARGERELATION, null);
                     env.set(SMALLRELATION, null);
                     env.set(NUM_PARTITION, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1da6b61..4381bd1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -43,6 +42,8 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER0 = "joiner0";
@@ -132,7 +133,7 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
                 private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
                 ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
@@ -147,11 +148,6 @@
                 private int B;
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     if (memoryForHashtable != 0)
                         build(inBuffer);
@@ -345,7 +341,7 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
                 private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx, rd1);
                 private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
@@ -358,7 +354,6 @@
                 private final FrameTupleAppender ftap = new FrameTupleAppender(ctx);
                 private final ByteBuffer inBuffer = ctx.getResourceManager().allocateFrame();
                 private final ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
-                private IFrameWriter writer;
                 private FileChannel[] channelsR;
                 private FileChannel[] channelsS;
                 private File filesS[];
@@ -550,14 +545,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalStateException();
-                    }
-                    this.writer = writer;
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                     writer.flush();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 7e64312..a987e68 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -16,7 +16,6 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -36,6 +35,8 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER = "joiner";
@@ -86,7 +87,7 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
                 @Override
@@ -113,11 +114,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                 }
             };
@@ -136,8 +132,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            IOperatorNodePushable op = new IOperatorNodePushable() {
-                private IFrameWriter writer;
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
                 @Override
@@ -159,14 +154,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalStateException();
-                    }
-                    this.writer = writer;
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                     writer.flush();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 0bfe83d..2869f13 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -20,7 +20,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -32,6 +31,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -64,7 +64,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            return new IOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private FileChannel out;
                 private int frameCount;
 
@@ -108,11 +108,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                 }
             };
@@ -132,7 +127,7 @@
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
-                public void open() throws HyracksDataException {
+                public void initialize() throws HyracksDataException {
                     try {
                         File inFile = new File((String) env.get(MATERIALIZED_FILE));
                         int frameCount = (Integer) env.get(FRAME_COUNT);
@@ -157,7 +152,7 @@
                 }
 
                 @Override
-                public void close() throws HyracksDataException {
+                public void deinitialize() throws HyracksDataException {
                     env.set(MATERIALIZED_FILE, null);
                 }
             };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index b35954f..c5149a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -1,16 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.dataflow.std.misc;
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -22,7 +35,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new IOperatorNodePushable() {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
             @Override
             public void open() throws HyracksDataException {
             }
@@ -36,10 +49,6 @@
             }
 
             @Override
-            public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-            }
-
-            @Override
             public void flush() throws HyracksDataException {
             }
         };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 4594685..1012098 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
@@ -98,7 +99,7 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
                 private final FrameTupleAccessor fta2 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
                 private List<ByteBuffer> inFrames;
@@ -273,11 +274,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                 }
             };
@@ -307,7 +303,7 @@
                 private FrameTupleAppender outFrameAppender;
 
                 @Override
-                public void open() throws HyracksDataException {
+                public void initialize() throws HyracksDataException {
                     inFrames = (List<ByteBuffer>) env.get(IN_FRAMES);
                     outFrame = ctx.getResourceManager().allocateFrame();
                     runs = (LinkedList<File>) env.get(RUNS);
@@ -339,11 +335,6 @@
                     env.set(RUNS, null);
                 }
 
-                @Override
-                public void close() throws HyracksDataException {
-                    // do nothing
-                }
-
                 // creates a new run from runs that can fit in memory.
                 private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
                     File newRun = null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index be81ecd..8664fdd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -18,7 +18,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -35,6 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -82,7 +82,7 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private List<ByteBuffer> buffers;
 
                 private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
@@ -206,11 +206,6 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    throw new IllegalArgumentException();
-                }
-
-                @Override
                 public void flush() throws HyracksDataException {
                 }
             };
@@ -231,7 +226,7 @@
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
-                public void open() throws HyracksDataException {
+                public void initialize() throws HyracksDataException {
                     List<ByteBuffer> buffers = (List<ByteBuffer>) env.get(BUFFERS);
                     long[] tPointers = (long[]) env.get(TPOINTERS);
                     FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
@@ -266,11 +261,6 @@
                     frame.limit(frame.capacity());
                     writer.nextFrame(frame);
                 }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    // do nothing
-                }
             };
             return op;
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index 037e546..b348677 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -18,14 +18,14 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 
-public final class DeserializedOperatorNodePushable implements IOperatorNodePushable {
+public final class DeserializedOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     private final IHyracksContext ctx;
 
     private final IOpenableDataWriterOperator delegate;
@@ -40,7 +40,7 @@
     }
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         delegate.setDataWriter(index, new SerializingDataWriter(ctx, recordDesc, writer));
     }
 
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
deleted file mode 100644
index 9dd430c..0000000
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.btree.dataflow;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.FileInfo;
-import edu.uci.ics.hyracks.storage.common.file.FileManager;
-
-public abstract class AbstractBTreeOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
-	
-	protected IBTreeInteriorFrame interiorFrame;
-	protected IBTreeLeafFrame leafFrame;
-	
-	protected BTree btree;
-	
-	protected AbstractBTreeOperatorDescriptor opDesc;	
-	protected IHyracksContext ctx;
-	
-	protected boolean createBTree;
-	
-	public AbstractBTreeOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, boolean createBTree) {
-		this.opDesc = opDesc;
-		this.ctx = ctx;
-		this.createBTree = createBTree;
-	}
-	
-	public void init() throws Exception {				
-		IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
-		FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
-		
-        File f = new File(opDesc.getBtreeFileName());
-        RandomAccessFile raf = new RandomAccessFile(f, "rw");
-        
-        if(!f.exists() && !createBTree) {
-        	throw new Exception("Trying to open btree from file " + opDesc.getBtreeFileName() + " but file doesn't exist.");
-        }
-        
-        try {
-        	FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
-        	fileManager.registerFile(fi);
-        }
-        catch (Exception e) {
-        }
-        
-        interiorFrame = opDesc.getInteriorFactory().getFrame();
-        leafFrame = opDesc.getLeafFactory().getFrame();    	
-        
-		BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
-		btree = btreeRegistry.get(opDesc.getBtreeFileId());
-        if(btree == null) {
-        	
-        	// create new btree and register it            
-            btreeRegistry.lock();
-            try {
-                // check if btree has already been registered by another thread
-                btree = btreeRegistry.get(opDesc.getBtreeFileId());                
-                if(btree == null) {                                    	                	                	
-                	// this thread should create and register the btee
-                	
-                	// start by building the multicomparator from the factories
-                	IFieldAccessor[] fields = new IFieldAccessor[opDesc.getFieldAccessorFactories().length];
-                	for(int i = 0; i < opDesc.getFieldAccessorFactories().length; i++) {
-                		fields[i] = opDesc.getFieldAccessorFactories()[i].getFieldAccessor();
-                	}
-                	
-                	IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
-                	for(int i = 0; i < opDesc.getComparatorFactories().length; i++) {
-                		comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
-                	}
-                	
-                    MultiComparator cmp = new MultiComparator(comparators, fields);
-                	
-                	btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
-                	if(createBTree) {
-                		MetaDataFrame metaFrame = new MetaDataFrame();                		
-                		btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
-                	}
-                	btree.open(opDesc.getBtreeFileId());
-                    btreeRegistry.register(opDesc.getBtreeFileId(), btree);
-                }                
-            }
-            finally {                        
-                btreeRegistry.unlock();
-            }
-        }                      
-	}	
-	
-	// debug
-	protected void fill() throws Exception {
-		
-		
-		// TODO: uncomment and fix
-		MetaDataFrame metaFrame = new MetaDataFrame();                		
-		btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
-		
-		Random rnd = new Random();
-		rnd.setSeed(50);				
-		
-		ByteBuffer frame = ctx.getResourceManager().allocateFrame();
-		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-		DataOutput dos = tb.getDataOutput();
-
-		ISerializerDeserializer[] recDescSers = { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE};
-		RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
-		IFrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recDesc);
-		accessor.reset(frame);
-		FrameTupleReference tuple = new FrameTupleReference();
-		
-		for (int i = 0; i < 10000; i++) {			
-			int f0 = rnd.nextInt() % 10000;
-			int f1 = 5;
-
-			tb.reset();
-        	IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
-        	tb.addFieldEndOffset();
-        	IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
-        	tb.addFieldEndOffset();        	
-        	        	
-        	appender.reset(frame, true);
-        	appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-        	
-        	tuple.reset(accessor, 0);
-						
-			if (i % 1000 == 0) {
-				System.out.println("INSERTING " + i + " : " + f0 + " " + f1);            	
-			}
-
-			try {                                
-				btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
-			} catch (Exception e) {
-			}
-		}
-		
-		/*
-        IFieldAccessor[] fields = new IFieldAccessor[2];
-        fields[0] = new Int32Accessor(); // key field
-        fields[1] = new Int32Accessor(); // value field
-
-        int keyLen = 1;
-        IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
-        cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-        MultiComparator cmp = new MultiComparator(cmps, fields);		
-
-        ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
-        DataOutputStream lkdos = new DataOutputStream(lkbaaos);    	    	    	
-        IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
-
-        ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
-        DataOutputStream hkdos = new DataOutputStream(hkbaaos);    	    	    	
-        IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
-
-        byte[] lowKey = lkbaaos.toByteArray();
-        byte[] highKey = hkbaaos.toByteArray();
-
-        IBinaryComparator[] searchCmps = new IBinaryComparator[1];
-        searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-        MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
-
-        RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
-        btree.search(cursor, rangePred, leafFrame, interiorFrame);
-        try {
-            while (cursor.hasNext()) {
-            	cursor.next();
-                byte[] array = cursor.getPage().getBuffer().array();
-                int recOffset = cursor.getOffset();                
-                String rec = cmp.printRecord(array, recOffset);
-                System.out.println(rec);         
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            cursor.close();
-        }	
-        */	 
-	}
-	
-	protected byte[] buildBTreeRecordFromHyraxRecord(IFrameTupleAccessor accessor, int tupleId, int[] keyFields, int[] payloadFields) {
-		
-		// determine size of record
-		int btreeRecordSize = 0;			
-		for(int j = 0; j < keyFields.length; j++) {
-			btreeRecordSize += accessor.getFieldLength(tupleId, keyFields[j]);
-		}
-		for(int j = 0; j < payloadFields.length; j++) {
-			btreeRecordSize += accessor.getFieldLength(tupleId, payloadFields[j]);
-		}			
-		
-		// allocate record and copy fields
-		byte[] btreeRecord = new byte[btreeRecordSize];
-		int recRunner = 0;
-		for(int j = 0; j < keyFields.length; j++) {
-			int fieldStartOff = accessor.getTupleStartOffset(tupleId) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(tupleId, keyFields[j]);				
-			int fieldLength = accessor.getFieldLength(tupleId, keyFields[j]);						
-			System.arraycopy(accessor.getBuffer().array(), fieldStartOff, btreeRecord, recRunner, fieldLength);				
-			recRunner += fieldLength;
-		}
-		for(int j = 0; j < payloadFields.length; j++) {
-			int fieldStartOff = accessor.getTupleStartOffset(tupleId) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(tupleId, payloadFields[j]);
-			int fieldLength = accessor.getFieldLength(tupleId, payloadFields[j]);
-			System.arraycopy(accessor.getBuffer().array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
-			recRunner += fieldLength;				
-		}						
-		
-		return btreeRecord;
-	}	
-}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 1b9db48..9120f15 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -12,7 +12,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import java.nio.ByteBuffer;
@@ -22,67 +21,70 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 
-public class BTreeBulkLoadOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-		
+public class BTreeBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
     private float fillFactor;
-    
+    private final BTreeOpHelper btreeOpHelper;
     private FrameTupleAccessor accessor;
     private BTree.BulkLoadContext bulkLoadCtx;
-    
+
     private IRecordDescriptorProvider recordDescProvider;
-    
+
     private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-    
-	public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
-		super(opDesc, ctx, true);
-		this.fillFactor = fillFactor;
-		this.recordDescProvider = recordDescProvider;
-		tuple.setFieldPermutation(fieldPermutation);
-	}
-	
-	@Override
-	public void close() throws HyracksDataException {
-		try {
-			btree.endBulkLoad(bulkLoadCtx);
-		} catch (Exception e) {
-			e.printStackTrace();
-		}			
-	}
-	
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {		
-		accessor.reset(buffer);
-		                      
-		int tupleCount = accessor.getTupleCount();
-		for(int i = 0; i < tupleCount; i++) {
-			tuple.reset(accessor, i);			
-			try {
-				btree.bulkLoadAddRecord(bulkLoadCtx, tuple);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}		
-	}
-	
-	@Override
-	public void open() throws HyracksDataException {		
-		RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);		
-		accessor = new FrameTupleAccessor(ctx, recDesc);
-		IBTreeMetaDataFrame metaFrame = new MetaDataFrame();		
-		try {
-			init();
-			btree.open(opDesc.getBtreeFileId());
-			bulkLoadCtx = btree.beginBulkLoad(fillFactor, leafFrame, interiorFrame, metaFrame);
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
+
+    public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+            int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, true);
+        this.fillFactor = fillFactor;
+        this.recordDescProvider = recordDescProvider;
+        tuple.setFieldPermutation(fieldPermutation);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            tuple.reset(accessor, i);
+            try {
+                btreeOpHelper.getBTree().bulkLoadAddRecord(bulkLoadCtx, tuple);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksContext(), recDesc);
+        IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+        try {
+            btreeOpHelper.init();
+            btreeOpHelper.getBTree().open(opDesc.getBtreeFileId());
+            bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor, btreeOpHelper.getLeafFrame(),
+                    btreeOpHelper.getInteriorFrame(), metaFrame);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 
     @Override
     public void flush() throws HyracksDataException {
-    }    
-}
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 373a27d..0c3b8d5 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -12,7 +12,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import java.io.DataOutput;
@@ -24,6 +23,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IFieldIterator;
@@ -31,81 +31,69 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.DiskOrderScanCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
 
-public class BTreeDiskOrderScanOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-	
-	public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
-		super(opDesc, ctx, false);
-	}
-	
-	@Override
-	public void open() throws HyracksDataException {		
-		
-		IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
-		DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
-		IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
-		
-		try {
-			init();				
-			fill();
-			btree.diskOrderScan(cursor, cursorFrame, metaFrame);			
-		} catch (FileNotFoundException e1) {
-			e1.printStackTrace();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		
-		MultiComparator cmp = btree.getMultiComparator();
-		ByteBuffer frame = ctx.getResourceManager().allocateFrame();
-		FrameTupleAppender appender = new FrameTupleAppender(ctx);
-		appender.reset(frame, true);
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
-		DataOutput dos = tb.getDataOutput();
-		
-		try {
-			while(cursor.hasNext()) {
-				tb.reset();                		
-				cursor.next();
+public class BTreeDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private final BTreeOpHelper btreeOpHelper;
 
-				IFieldIterator fieldIter = cursor.getFieldIterator();
-				for(int i = 0; i < cmp.getFields().length; i++) {					
-					int fieldLen = fieldIter.getFieldSize();
-					dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
-					tb.addFieldEndOffset();
-				}
-				
-				if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-					FrameUtils.flushFrame(frame, writer);
-					appender.reset(frame, true);
-					if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-						throw new IllegalStateException();
-					}
-				}
-
-				//int recOffset = cursor.getOffset();                
-				//String rec = cmp.printRecord(array, recOffset);
-				//System.out.println(rec);
-			}
-
-			if (appender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(frame, writer);
-			}
-			writer.close();
-
-		} catch (Exception e) {					
-			e.printStackTrace();
-		}
-	}
-	
-	@Override
-    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        throw new UnsupportedOperationException();
+    public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
     }
-	
-	@Override
-	public void close() throws HyracksDataException {            	
-	}
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void initialize() throws HyracksDataException {
+
+        IBTreeLeafFrame cursorFrame = btreeOpHelper.getOperatorDescriptor().getLeafFactory().getFrame();
+        DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
+        IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+
+        try {
+            btreeOpHelper.init();
+            btreeOpHelper.fill();
+            btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
+        } catch (FileNotFoundException e1) {
+            e1.printStackTrace();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
+        ByteBuffer frame = btreeOpHelper.getHyracksContext().getResourceManager().allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksContext());
+        appender.reset(frame, true);
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+        DataOutput dos = tb.getDataOutput();
+
+        try {
+            while (cursor.hasNext()) {
+                tb.reset();
+                cursor.next();
+
+                IFieldIterator fieldIter = cursor.getFieldIterator();
+                for (int i = 0; i < cmp.getFields().length; i++) {
+                    int fieldLen = fieldIter.getFieldSize();
+                    dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
+                    tb.addFieldEndOffset();
+                }
+
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+
+                //int recOffset = cursor.getOffset();                
+                //String rec = cmp.printRecord(array, recOffset);
+                //System.out.println(rec);
+            }
+
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+            writer.close();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index 09668fe..ad35a93 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -12,7 +12,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import java.nio.ByteBuffer;
@@ -23,82 +22,96 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOp;
 
-public class BTreeInsertUpdateDeleteOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-		
+public class BTreeInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final BTreeOpHelper btreeOpHelper;
+
     private FrameTupleAccessor accessor;
-    
+
     private IRecordDescriptorProvider recordDescProvider;
-    
+
     private IBTreeMetaDataFrame metaFrame;
-    
+
     private BTreeOp op;
-    
+
     private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-    
-	public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
-		super(opDesc, ctx, false);		
-		this.recordDescProvider = recordDescProvider;
-		this.op = op;
-		tuple.setFieldPermutation(fieldPermutation);
-	}
-	
-	@Override
-	public void close() throws HyracksDataException {
-		writer.close();		
-	}
-	
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {		
-		accessor.reset(buffer);
-		
-		int tupleCount = accessor.getTupleCount();
-		for(int i = 0; i < tupleCount; i++) {
-			tuple.reset(accessor, i);
-			try {
-				
-				switch(op) {
-				
-				case BTO_INSERT: {
-					btree.insert(tuple, leafFrame, interiorFrame, metaFrame);				
-				} break;
-				
-				case BTO_DELETE: {
-					btree.delete(tuple, leafFrame, interiorFrame, metaFrame);				
-				} break;
-				
-				default: {
-					throw new HyracksDataException("Unsupported operation " + op + " in BTree InsertUpdateDelete operator");
-				}
-				
-				}
-				
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-		
-		// pass a copy of the frame to next op
-		FrameUtils.flushFrame(buffer.duplicate(), writer);
-	}
-	
-	@Override
-	public void open() throws HyracksDataException {		
-		RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);		
-		accessor = new FrameTupleAccessor(ctx, recDesc);		
-		try {
-			init();
-			btree.open(opDesc.getBtreeFileId());
-			metaFrame = new MetaDataFrame();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
+
+    public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+            int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+        this.recordDescProvider = recordDescProvider;
+        this.op = op;
+        tuple.setFieldPermutation(fieldPermutation);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        final BTree btree = btreeOpHelper.getBTree();
+        final IBTreeLeafFrame leafFrame = btreeOpHelper.getLeafFrame();
+        final IBTreeInteriorFrame interiorFrame = btreeOpHelper.getInteriorFrame();
+
+        accessor.reset(buffer);
+
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            tuple.reset(accessor, i);
+            try {
+
+                switch (op) {
+
+                    case BTO_INSERT: {
+                        btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
+                    }
+                        break;
+
+                    case BTO_DELETE: {
+                        btree.delete(tuple, leafFrame, interiorFrame, metaFrame);
+                    }
+                        break;
+
+                    default: {
+                        throw new HyracksDataException("Unsupported operation " + op
+                                + " in BTree InsertUpdateDelete operator");
+                    }
+
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        // pass a copy of the frame to next op
+        FrameUtils.flushFrame(buffer.duplicate(), writer);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksContext(), recDesc);
+        try {
+            btreeOpHelper.init();
+            btreeOpHelper.getBTree().open(opDesc.getBtreeFileId());
+            metaFrame = new MetaDataFrame();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 
     @Override
     public void flush() throws HyracksDataException {
-    }    
-}
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
new file mode 100644
index 0000000..68edce1
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+final class BTreeOpHelper {
+    private IBTreeInteriorFrame interiorFrame;
+    private IBTreeLeafFrame leafFrame;
+
+    private BTree btree;
+
+    private AbstractBTreeOperatorDescriptor opDesc;
+    private IHyracksContext ctx;
+
+    private boolean createBTree;
+
+    BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, boolean createBTree) {
+        this.opDesc = opDesc;
+        this.ctx = ctx;
+        this.createBTree = createBTree;
+    }
+
+    void init() throws Exception {
+        IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
+        FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
+
+        File f = new File(opDesc.getBtreeFileName());
+        RandomAccessFile raf = new RandomAccessFile(f, "rw");
+
+        if (!f.exists() && !createBTree) {
+            throw new Exception("Trying to open btree from file " + opDesc.getBtreeFileName()
+                    + " but file doesn't exist.");
+        }
+
+        try {
+            FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
+            fileManager.registerFile(fi);
+        } catch (Exception e) {
+        }
+
+        interiorFrame = opDesc.getInteriorFactory().getFrame();
+        leafFrame = opDesc.getLeafFactory().getFrame();
+
+        BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
+        btree = btreeRegistry.get(opDesc.getBtreeFileId());
+        if (btree == null) {
+
+            // create new btree and register it            
+            btreeRegistry.lock();
+            try {
+                // check if btree has already been registered by another thread
+                btree = btreeRegistry.get(opDesc.getBtreeFileId());
+                if (btree == null) {
+                    // this thread should create and register the btee
+
+                    // start by building the multicomparator from the factories
+                    IFieldAccessor[] fields = new IFieldAccessor[opDesc.getFieldAccessorFactories().length];
+                    for (int i = 0; i < opDesc.getFieldAccessorFactories().length; i++) {
+                        fields[i] = opDesc.getFieldAccessorFactories()[i].getFieldAccessor();
+                    }
+
+                    IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
+                    for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
+                        comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
+                    }
+
+                    MultiComparator cmp = new MultiComparator(comparators, fields);
+
+                    btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
+                    if (createBTree) {
+                        MetaDataFrame metaFrame = new MetaDataFrame();
+                        btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+                    }
+                    btree.open(opDesc.getBtreeFileId());
+                    btreeRegistry.register(opDesc.getBtreeFileId(), btree);
+                }
+            } finally {
+                btreeRegistry.unlock();
+            }
+        }
+    }
+
+    // debug
+    void fill() throws Exception {
+
+        // TODO: uncomment and fix
+        MetaDataFrame metaFrame = new MetaDataFrame();
+        btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+
+        Random rnd = new Random();
+        rnd.setSeed(50);
+
+        ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx);
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+
+        ISerializerDeserializer[] recDescSers = { IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recDesc);
+        accessor.reset(frame);
+        FrameTupleReference tuple = new FrameTupleReference();
+
+        for (int i = 0; i < 10000; i++) {
+            int f0 = rnd.nextInt() % 10000;
+            int f1 = 5;
+
+            tb.reset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
+            tb.addFieldEndOffset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
+            tb.addFieldEndOffset();
+
+            appender.reset(frame, true);
+            appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+
+            tuple.reset(accessor, 0);
+
+            if (i % 1000 == 0) {
+                System.out.println("INSERTING " + i + " : " + f0 + " " + f1);
+            }
+
+            try {
+                btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
+            } catch (Exception e) {
+            }
+        }
+
+        /*
+        IFieldAccessor[] fields = new IFieldAccessor[2];
+        fields[0] = new Int32Accessor(); // key field
+        fields[1] = new Int32Accessor(); // value field
+
+        int keyLen = 1;
+        IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+        cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        MultiComparator cmp = new MultiComparator(cmps, fields);		
+
+        ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+        DataOutputStream lkdos = new DataOutputStream(lkbaaos);    	    	    	
+        IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+        ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+        DataOutputStream hkdos = new DataOutputStream(hkbaaos);    	    	    	
+        IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+        byte[] lowKey = lkbaaos.toByteArray();
+        byte[] highKey = hkbaaos.toByteArray();
+
+        IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+        searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+        RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+        btree.search(cursor, rangePred, leafFrame, interiorFrame);
+        try {
+            while (cursor.hasNext()) {
+            	cursor.next();
+                byte[] array = cursor.getPage().getBuffer().array();
+                int recOffset = cursor.getOffset();                
+                String rec = cmp.printRecord(array, recOffset);
+                System.out.println(rec);         
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            cursor.close();
+        }	
+        */
+    }
+
+    public BTree getBTree() {
+        return btree;
+    }
+
+    public IHyracksContext getHyracksContext() {
+        return ctx;
+    }
+
+    public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
+        return opDesc;
+    }
+
+    public IBTreeLeafFrame getLeafFrame() {
+        return leafFrame;
+    }
+
+    public IBTreeInteriorFrame getInteriorFrame() {
+        return interiorFrame;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 3bd3e36..2b78993 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -12,7 +12,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import java.io.DataOutput;
@@ -26,106 +25,102 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IFieldIterator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
 
-public class BTreeSearchOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+public class BTreeSearchOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private BTreeOpHelper btreeOpHelper;
+    private boolean isForward;
+    private ITupleReference lowKey;
+    private ITupleReference highKey;
+    private int searchKeyFields;
 
-	private boolean isForward;
-	private ITupleReference lowKey;
-	private ITupleReference highKey;
-	private int searchKeyFields;
-	
-	public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, boolean isForward, ITupleReference lowKey, ITupleReference highKey, int searchKeyFields) {
-		super(opDesc, ctx, false);
-		this.isForward = isForward;
-		this.lowKey = lowKey;
-		this.highKey = highKey;
-		this.searchKeyFields = searchKeyFields;
-	}
-	
-	@Override
-	public void open() throws HyracksDataException {		
-		
-		IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
-		IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
-						
-		try {
-			init();										
-			fill();
-			
-			// construct range predicate
-			assert(searchKeyFields <= btree.getMultiComparator().getKeyLength());
-			IBinaryComparator[] searchComparators = new IBinaryComparator[searchKeyFields];
-			for(int i = 0; i < searchKeyFields; i++) {
-				searchComparators[i] = btree.getMultiComparator().getComparators()[i];
-			}			
-			MultiComparator searchCmp = new MultiComparator(searchComparators, btree.getMultiComparator().getFields());
-			RangePredicate rangePred = new RangePredicate(isForward, lowKey, highKey, searchCmp);
-			
-			btree.search(cursor, rangePred, leafFrame, interiorFrame);
-		} catch (FileNotFoundException e1) {
-			e1.printStackTrace();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		
-		MultiComparator cmp = btree.getMultiComparator();
-		ByteBuffer frame = ctx.getResourceManager().allocateFrame();
-		FrameTupleAppender appender = new FrameTupleAppender(ctx);
-		appender.reset(frame, true);
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
-		DataOutput dos = tb.getDataOutput();
-		
-		try {
-			while(cursor.hasNext()) {
-				tb.reset();                		
-				cursor.next();
-				
-				IFieldIterator fieldIter = cursor.getFieldIterator();
-				for(int i = 0; i < cmp.getFields().length; i++) {					
-					int fieldLen = fieldIter.getFieldSize();
-					dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
-					tb.addFieldEndOffset();
-				}
-
-				if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-					FrameUtils.flushFrame(frame, writer);
-					appender.reset(frame, true);
-					if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-						throw new IllegalStateException();
-					}
-				}
-
-				//int recOffset = cursor.getOffset();                
-				//String rec = cmp.printRecord(array, recOffset);
-				//System.out.println(rec);
-			}
-
-			if (appender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(frame, writer);
-			}
-			writer.close();
-
-		} catch (Exception e) {					
-			e.printStackTrace();
-		}
-	}
-	
-	@Override
-    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        throw new UnsupportedOperationException();
+    public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+            boolean isForward, ITupleReference lowKey, ITupleReference highKey, int searchKeyFields) {
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+        this.isForward = isForward;
+        this.lowKey = lowKey;
+        this.highKey = highKey;
+        this.searchKeyFields = searchKeyFields;
     }
-	
-	@Override
-	public void close() throws HyracksDataException {            	
-	}
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void initialize() throws HyracksDataException {
+        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+        BTree btree = btreeOpHelper.getBTree();
+        IBTreeLeafFrame leafFrame = btreeOpHelper.getLeafFrame();
+        IBTreeInteriorFrame interiorFrame = btreeOpHelper.getInteriorFrame();
+        IHyracksContext ctx = btreeOpHelper.getHyracksContext();
+
+        IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+        IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
+
+        try {
+            btreeOpHelper.init();
+            btreeOpHelper.fill();
+
+            // construct range predicate
+            assert (searchKeyFields <= btree.getMultiComparator().getKeyLength());
+            IBinaryComparator[] searchComparators = new IBinaryComparator[searchKeyFields];
+            for (int i = 0; i < searchKeyFields; i++) {
+                searchComparators[i] = btree.getMultiComparator().getComparators()[i];
+            }
+            MultiComparator searchCmp = new MultiComparator(searchComparators, btree.getMultiComparator().getFields());
+            RangePredicate rangePred = new RangePredicate(isForward, lowKey, highKey, searchCmp);
+
+            btree.search(cursor, rangePred, leafFrame, interiorFrame);
+        } catch (FileNotFoundException e1) {
+            e1.printStackTrace();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        MultiComparator cmp = btree.getMultiComparator();
+        ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx);
+        appender.reset(frame, true);
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+        DataOutput dos = tb.getDataOutput();
+
+        try {
+            while (cursor.hasNext()) {
+                tb.reset();
+                cursor.next();
+
+                IFieldIterator fieldIter = cursor.getFieldIterator();
+                for (int i = 0; i < cmp.getFields().length; i++) {
+                    int fieldLen = fieldIter.getFieldSize();
+                    dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
+                    tb.addFieldEndOffset();
+                }
+
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+
+                //int recOffset = cursor.getOffset();                
+                //String rec = cmp.printRecord(array, recOffset);
+                //System.out.println(rec);
+            }
+
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+            writer.close();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
 }
\ No newline at end of file