Refactored Operator and Connector touchpoints

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@31 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
new file mode 100644
index 0000000..8173785
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+
+public interface IHyracksContext {
+    public IResourceManager getResourceManager();
+
+    public int getFrameSize();
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 6a3a447..82d0e9e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -16,22 +16,15 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public interface IActivityNode extends Serializable {
     public ActivityNodeId getActivityNodeId();
 
     public IOperatorDescriptor getOwner();
 
-    public boolean supportsPushInterface();
-
-    public boolean supportsPullInterface();
-
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions);
-
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions);
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions);
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index b3feef8..25be8b3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -22,9 +22,9 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 /**
  * Connector that connects operators in a Job.
@@ -44,8 +44,8 @@
      * 
      * @param ctx
      *            Context
-     * @param plan
-     *            Job plan
+     * @param recordDesc
+     *            Record Descriptor
      * @param edwFactory
      *            Endpoint writer factory.
      * @param index
@@ -57,15 +57,16 @@
      * @return data writer.
      * @throws Exception
      */
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this connector.
      * 
      * @param ctx
      *            Context
-     * @param plan
+     * @param recordDesc
      *            Job plan
      * @param demux
      *            Connection Demultiplexer
@@ -78,8 +79,9 @@
      * @return data reader
      * @throws HyracksDataException
      */
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException;
 
     /**
      * Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
index 13ac15c..f019b13 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -15,7 +15,8 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public interface IOperatorNodePushable extends IFrameWriter {
-    public void setFrameWriter(int index, IFrameWriter writer);
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
new file mode 100644
index 0000000..96d43a7
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public interface IRecordDescriptorProvider {
+    public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex);
+
+    public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex);
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java
new file mode 100644
index 0000000..495db70
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface IResourceManager {
+
+    public ByteBuffer allocateFrame();
+
+    public File createFile(String prefix, String suffix) throws IOException;
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index ea8ede3..bfc79e7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -24,7 +24,7 @@
 
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 
 public class ConnectionEntry implements IConnectionEntry {
     private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
@@ -47,7 +47,7 @@
 
     private boolean aborted;
 
-    public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+    public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
         this.socketChannel = socketChannel;
         readBuffer = ctx.getResourceManager().allocateFrame();
         readBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index 55e2962..ce752f3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -41,9 +41,9 @@
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.io.FrameHelper;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class ConnectionManager {
     private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
@@ -54,7 +54,7 @@
 
     private ServerSocketChannel serverSocketChannel;
 
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
 
     private final Map<UUID, IDataReceiveListenerFactory> pendingConnectionReceivers;
 
@@ -70,7 +70,7 @@
 
     private ByteBuffer emptyFrame;
 
-    public ConnectionManager(HyracksContext ctx, InetAddress address) throws IOException {
+    public ConnectionManager(IHyracksContext ctx, InetAddress address) throws IOException {
         this.ctx = ctx;
         serverSocketChannel = ServerSocketChannel.open();
         ServerSocket serverSocket = serverSocketChannel.socket();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 694e61a..af7c6fc 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -28,22 +28,22 @@
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
     IDataReceiveListener {
     private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
 
     private final NonDeterministicFrameReader frameReader;
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final BitSet readyBits;
     private IConnectionEntry senders[];
     private int openSenderCount;
     private UUID jobId;
     private UUID stageId;
 
-    public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
+    public DemuxDataReceiveListenerFactory(IHyracksContext ctx, UUID jobId, UUID stageId) {
         frameReader = new NonDeterministicFrameReader(ctx, this);
         this.ctx = ctx;
         this.jobId = jobId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 91350735..0ddc5d6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -21,19 +21,19 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.io.FrameHelper;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class NonDeterministicFrameReader implements IFrameReader {
     private static final Logger LOGGER = Logger.getLogger(NonDeterministicFrameReader.class.getName());
 
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final IConnectionDemultiplexer demux;
     private int lastReadSender;
     private boolean eos;
 
-    public NonDeterministicFrameReader(HyracksContext ctx, IConnectionDemultiplexer demux) {
+    public NonDeterministicFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux) {
         this.ctx = ctx;
         this.demux = demux;
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
index 62372dc..5c97397 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,12 +35,11 @@
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class SortMergeFrameReader implements IFrameReader {
     private static final Logger LOGGER = Logger.getLogger(SortMergeFrameReader.class.getName());
 
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final IConnectionDemultiplexer demux;
     private final FrameTuplePairComparator tpc;
     private final FrameTupleAppender appender;
@@ -50,7 +50,7 @@
     private int lastReadSender;
     private boolean first;
 
-    public SortMergeFrameReader(HyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
+    public SortMergeFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
             IBinaryComparator[] comparators, RecordDescriptor recordDescriptor) {
         this.ctx = ctx;
         this.demux = demux;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
index 22d4961..c6b365b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
@@ -21,10 +21,10 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class FrameDeserializer {
     private static final Logger LOGGER = Logger.getLogger(FrameDeserializer.class.getName());
@@ -43,7 +43,7 @@
 
     private ByteBuffer buffer;
 
-    public FrameDeserializer(HyracksContext ctx, RecordDescriptor recordDescriptor) {
+    public FrameDeserializer(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
         this.bbis = new ByteBufferInputStream();
         this.di = new DataInputStream(bbis);
         this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
index 31d37b7..08f3472 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
@@ -17,10 +17,10 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class FrameDeserializingDataReader implements IOpenableDataReader<Object[]> {
     private final ByteBuffer buffer;
@@ -33,7 +33,7 @@
 
     private final FrameDeserializer frameDeserializer;
 
-    public FrameDeserializingDataReader(HyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
+    public FrameDeserializingDataReader(IHyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
         buffer = ctx.getResourceManager().allocateFrame();
         this.frameReader = frameReader;
         this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
index e0a7250..413ded6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
@@ -17,16 +17,16 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class FrameDeserializingDataWriter implements IFrameWriter {
     private final IOpenableDataWriter<Object[]> writer;
     private final FrameDeserializer frameDeserializer;
 
-    public FrameDeserializingDataWriter(HyracksContext ctx, IOpenableDataWriter<Object[]> writer,
+    public FrameDeserializingDataWriter(IHyracksContext ctx, IOpenableDataWriter<Object[]> writer,
             RecordDescriptor recordDescriptor) {
         this.writer = writer;
         this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
index a82a051..52ecfce 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
@@ -14,10 +14,10 @@
  */
 package edu.uci.ics.hyracks.comm.io;
 
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 
 public class FrameHelper {
-    public static int getTupleCountOffset(HyracksContext ctx) {
+    public static int getTupleCountOffset(IHyracksContext ctx) {
         return ctx.getFrameSize() - 4;
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
index 05d83cd..06c5a17 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
@@ -17,10 +17,10 @@
 import java.io.DataInputStream;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 /**
  * FrameTupleCursor is used to navigate over tuples in a Frame.
@@ -33,12 +33,12 @@
  * @author vinayakb
  */
 public final class FrameTupleAccessor {
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final RecordDescriptor recordDescriptor;
 
     private ByteBuffer buffer;
 
-    public FrameTupleAccessor(HyracksContext ctx, RecordDescriptor recordDescriptor) {
+    public FrameTupleAccessor(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
         this.ctx = ctx;
         this.recordDescriptor = recordDescriptor;
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 70f0248..c57e589 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -16,10 +16,10 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 
 public class FrameTupleAppender {
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
 
     private ByteBuffer buffer;
 
@@ -27,7 +27,7 @@
 
     private int tupleDataEndOffset;
 
-    public FrameTupleAppender(HyracksContext ctx) {
+    public FrameTupleAppender(IHyracksContext ctx) {
         this.ctx = ctx;
     }
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
index 4103dd6..593eb3b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
@@ -19,10 +19,10 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
     private static final Logger LOGGER = Logger.getLogger(SerializingDataWriter.class.getName());
@@ -39,7 +39,7 @@
 
     private boolean open;
 
-    public SerializingDataWriter(HyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
+    public SerializingDataWriter(IHyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
         buffer = ctx.getResourceManager().allocateFrame();
         tb = new ArrayTupleBuilder(recordDescriptor.getFields().length);
         this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
index 7fe088d..b061919 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
@@ -14,10 +14,12 @@
  */
 package edu.uci.ics.hyracks.context;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
 import edu.uci.ics.hyracks.resources.ResourceManager;
 
-public class HyracksContext {
-    private final ResourceManager resourceManager;
+public class HyracksContext implements IHyracksContext {
+    private final IResourceManager resourceManager;
     private final int frameSize;
 
     public HyracksContext(int frameSize) {
@@ -25,10 +27,12 @@
         this.frameSize = frameSize;
     }
 
-    public ResourceManager getResourceManager() {
+    @Override
+    public IResourceManager getResourceManager() {
         return resourceManager;
     }
 
+    @Override
     public int getFrameSize() {
         return frameSize;
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 758a2a0..682d6a2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.controller.IClusterController;
 import edu.uci.ics.hyracks.api.controller.INodeController;
 import edu.uci.ics.hyracks.api.controller.NodeCapability;
@@ -52,6 +53,8 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -71,7 +74,7 @@
 
     private final String id;
 
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
 
     private final NodeCapability nodeCapability;
 
@@ -162,71 +165,90 @@
     }
 
     @Override
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
-            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, final JobPlan plan, UUID stageId,
+            int attempt, Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
             throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+        try {
+            LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
 
-        final Joblet joblet = getLocalJoblet(jobId);
-
-        Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
-        joblet.setStagelet(stageId, stagelet);
-
-        final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-        Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
-
-        List<Endpoint> endpointList = new ArrayList<Endpoint>();
-
-        for (ActivityNodeId hanId : tasks.keySet()) {
-            IActivityNode han = plan.getActivityNodeMap().get(hanId);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest("Initializing " + hanId + " -> " + han);
-            }
-            IOperatorDescriptor op = han.getOwner();
-            List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
-            for (int i : tasks.get(hanId)) {
-                IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i,
-                        opPartitions.get(op.getOperatorId()).size());
-                OperatorRunnable or = new OperatorRunnable(ctx, hon);
-                stagelet.setOperator(op.getOperatorId(), i, or);
-                if (inputs != null) {
-                    for (int j = 0; j < inputs.size(); ++j) {
-                        if (j >= 1) {
-                            throw new IllegalStateException();
-                        }
-                        IConnectorDescriptor conn = inputs.get(j);
-                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                                .getOperatorId();
-                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                                .getOperatorId();
-                        Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
-                        endpointList.add(endpoint);
-                        DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
-                        connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
-                        PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
-                                .getTaskInputMap().get(hanId).get(j), i);
-                        if (LOGGER.isLoggable(Level.FINEST)) {
-                            LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
-                        }
-                        portMap.put(piId, endpoint);
-                        IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
-                                .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
-                        or.setFrameReader(reader);
-                    }
+            IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+                @Override
+                public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+                    return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
                 }
-                honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+
+                @Override
+                public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+                    return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+                }
+            };
+
+            final Joblet joblet = getLocalJoblet(jobId);
+
+            Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+            joblet.setStagelet(stageId, stagelet);
+
+            final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+            Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+            List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+            for (ActivityNodeId hanId : tasks.keySet()) {
+                IActivityNode han = plan.getActivityNodeMap().get(hanId);
+                if (LOGGER.isLoggable(Level.FINEST)) {
+                    LOGGER.finest("Initializing " + hanId + " -> " + han);
+                }
+                IOperatorDescriptor op = han.getOwner();
+                List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+                for (int i : tasks.get(hanId)) {
+                    IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
+                            opPartitions.get(op.getOperatorId()).size());
+                    OperatorRunnable or = new OperatorRunnable(ctx, hon);
+                    stagelet.setOperator(op.getOperatorId(), i, or);
+                    if (inputs != null) {
+                        for (int j = 0; j < inputs.size(); ++j) {
+                            if (j >= 1) {
+                                throw new IllegalStateException();
+                            }
+                            IConnectorDescriptor conn = inputs.get(j);
+                            OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                                    .getOperatorId();
+                            OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                                    .getOperatorId();
+                            Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+                            endpointList.add(endpoint);
+                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+                                    stageId);
+                            connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+                                    .getTaskInputMap().get(hanId).get(j), i);
+                            if (LOGGER.isLoggable(Level.FINEST)) {
+                                LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+                            }
+                            portMap.put(piId, endpoint);
+                            IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+                                    opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+                            or.setFrameReader(reader);
+                        }
+                    }
+                    honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+                }
             }
+
+            stagelet.setEndpointList(endpointList);
+
+            return portMap;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
         }
-
-        stagelet.setEndpointList(endpointList);
-
-        return portMap;
     }
 
     private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
             final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
             throws HyracksDataException {
-        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+                .getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount,
                 nConsumerCount);
 
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
@@ -262,52 +284,59 @@
     public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
             Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
             final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
-        final Joblet ji = getLocalJoblet(jobId);
-        Stagelet si = (Stagelet) ji.getStagelet(stageId);
-        final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+        try {
+            LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+            final Joblet ji = getLocalJoblet(jobId);
+            Stagelet si = (Stagelet) ji.getStagelet(stageId);
+            final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
 
-        final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+            final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
 
-        final JobSpecification spec = plan.getJobSpecification();
+            final JobSpecification spec = plan.getJobSpecification();
 
-        for (ActivityNodeId hanId : tasks.keySet()) {
-            IActivityNode han = plan.getActivityNodeMap().get(hanId);
-            IOperatorDescriptor op = han.getOwner();
-            List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
-            for (int i : tasks.get(hanId)) {
-                OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
-                if (outputs != null) {
-                    for (int j = 0; j < outputs.size(); ++j) {
-                        final IConnectorDescriptor conn = outputs.get(j);
-                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                                .getOperatorId();
-                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                                .getOperatorId();
-                        final int senderIndex = i;
-                        IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
-                            @Override
-                            public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
-                                PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                        Direction.INPUT, spec.getConsumerInputIndex(conn), index);
-                                Endpoint ep = globalPortMap.get(piId);
-                                if (ep == null) {
-                                    LOGGER.info("Got null Endpoint for " + piId);
-                                    throw new NullPointerException();
+            for (ActivityNodeId hanId : tasks.keySet()) {
+                IActivityNode han = plan.getActivityNodeMap().get(hanId);
+                IOperatorDescriptor op = han.getOwner();
+                List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+                for (int i : tasks.get(hanId)) {
+                    OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+                    if (outputs != null) {
+                        for (int j = 0; j < outputs.size(); ++j) {
+                            final IConnectorDescriptor conn = outputs.get(j);
+                            OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                                    .getOperatorId();
+                            OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                                    .getOperatorId();
+                            final int senderIndex = i;
+                            IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+                                @Override
+                                public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+                                    PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+                                            Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                    Endpoint ep = globalPortMap.get(piId);
+                                    if (ep == null) {
+                                        LOGGER.info("Got null Endpoint for " + piId);
+                                        throw new NullPointerException();
+                                    }
+                                    if (LOGGER.isLoggable(Level.FINEST)) {
+                                        LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+                                    }
+                                    return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+                                            ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
                                 }
-                                if (LOGGER.isLoggable(Level.FINEST)) {
-                                    LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
-                                }
-                                return createWriter(connectionManager.connect(ep.getNetworkAddress(),
-                                        ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
-                            }
-                        };
-                        or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i,
-                                opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()));
+                            };
+                            or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+                                    .getConnectorRecordDescriptor(conn), edwFactory, i,
+                                    opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()), spec
+                                    .getConnectorRecordDescriptor(conn));
+                        }
                     }
+                    stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
                 }
-                stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
             }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
         }
     }
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
index 457a44c..9f94086 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
@@ -27,20 +27,19 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.util.ReferenceEntry;
@@ -91,14 +90,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            return null;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -278,22 +271,12 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
             };
             return op;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return false;
-        }
     }
 
     private class MergeActivity extends AbstractActivityNode {
@@ -305,14 +288,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            return null;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -368,7 +345,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalArgumentException();
                     }
@@ -472,16 +449,6 @@
             };
             return op;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 
     private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
@@ -511,7 +478,7 @@
         };
     }
 
-    private void flushFrames(HyracksContext ctx, List<ByteBuffer> inFrames, ByteBuffer outFrame, long[] tPointers,
+    private void flushFrames(IHyracksContext ctx, List<ByteBuffer> inFrames, ByteBuffer outFrame, long[] tPointers,
             IFrameWriter writer) throws HyracksDataException {
         FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
         FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
index 2c1e28a..76b2ae4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
@@ -18,13 +18,13 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class HashDataWriter implements IFrameWriter {
     private final int consumerPartitionCount;
@@ -33,7 +33,7 @@
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
 
-    public HashDataWriter(HyracksContext ctx, int consumerPartitionCount, IEndpointDataWriterFactory edwFactory,
+    public HashDataWriter(IHyracksContext ctx, int consumerPartitionCount, IEndpointDataWriterFactory edwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
         this.consumerPartitionCount = consumerPartitionCount;
         epWriters = new IFrameWriter[consumerPartitionCount];
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
index 1c7e6c7..f33b2b0 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
@@ -19,21 +19,20 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
 
@@ -76,14 +75,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            return null;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -212,22 +205,12 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
             };
             return op;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return false;
-        }
     }
 
     private class MergeActivity extends AbstractActivityNode {
@@ -239,14 +222,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            return null;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             IOperatorNodePushable op = new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
@@ -298,7 +275,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalArgumentException();
                     }
@@ -307,15 +284,5 @@
             };
             return op;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 5013812..5281042 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -17,13 +17,13 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
 
 public class MToNHashPartitioningConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -36,17 +36,18 @@
     }
 
     @Override
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        JobSpecification spec = plan.getJobSpecification();
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
-            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 28cd1fa..ffc42a1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -17,15 +17,15 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.SortMergeFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
 
 public class MToNHashPartitioningMergingConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -36,7 +36,7 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
 
     public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
-        int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
@@ -44,22 +44,22 @@
     }
 
     @Override
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        JobSpecification spec = plan.getJobSpecification();
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
-            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        JobSpecification spec = plan.getJobSpecification();
-        return new SortMergeFrameReader(ctx, demux, sortFields, comparators, spec.getConnectorRecordDescriptor(this));
+        return new SortMergeFrameReader(ctx, demux, sortFields, comparators, recordDesc);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index d3a8af9..c9d6ee1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -20,29 +20,26 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
 
 public class MToNRangePartitioningConnectorDescriptor extends AbstractConnectorDescriptor {
     private static final long serialVersionUID = 1L;
 
     private class RangeDataWriter implements IFrameWriter {
-        private final int consumerPartitionCount;
         private final IFrameWriter[] epWriters;
         private final FrameTupleAppender[] appenders;
         private final FrameTupleAccessor tupleAccessor;
 
-        public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
-            FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
-            this.consumerPartitionCount = consumerPartitionCount;
+        public RangeDataWriter(IHyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
+                FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
             this.epWriters = epWriters;
             this.appenders = appenders;
             tupleAccessor = new FrameTupleAccessor(ctx, recordDescriptor);
@@ -105,9 +102,9 @@
     }
 
     @Override
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        JobSpecification spec = plan.getJobSpecification();
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
         final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {
@@ -119,14 +116,15 @@
                 throw new HyracksDataException(e);
             }
         }
-        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
-            .getConnectorRecordDescriptor(this));
+        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders,
+                recordDesc);
         return rangeWriter;
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 324c39f..988fb5c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -19,12 +19,12 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
 
 public class MToNReplicatingConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -35,7 +35,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc, IEndpointDataWriterFactory edwFactory,
         int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc, IConnectionDemultiplexer demux,
         int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
index 807e890..59cf71b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
@@ -14,15 +14,14 @@
  */
 package edu.uci.ics.hyracks.coreops;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -70,24 +69,9 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
index 9f06d7b..735af5a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
@@ -21,16 +21,15 @@
 import java.nio.channels.FileChannel;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
 
@@ -62,14 +61,8 @@
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private FileChannel out;
                 private int frameCount;
@@ -114,7 +107,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
             };
@@ -124,30 +117,14 @@
         public IOperatorDescriptor getOwner() {
             return MaterializingOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 
     private final class ReaderActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
@@ -187,7 +164,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalArgumentException();
                     }
@@ -200,15 +177,5 @@
         public IOperatorDescriptor getOwner() {
             return MaterializingOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index fff3aa6..c2ce792 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -17,12 +17,12 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
 
 public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -33,14 +33,16 @@
     }
 
     @Override
-    public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
index a07a2f9..c378961 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
@@ -14,14 +14,13 @@
  */
 package edu.uci.ics.hyracks.coreops;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -58,24 +57,9 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
index 08ec2bb..f90eaea 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
@@ -17,18 +17,17 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
@@ -49,14 +48,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private ArrayList<Object[]> buffer;
 
@@ -81,17 +74,8 @@
                     buffer.add(data);
                 }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
-        }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
+            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+                    getOperatorId(), 0));
         }
     }
 
@@ -104,14 +88,8 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private IOpenableDataWriter<Object[]> writer;
 
@@ -143,17 +121,8 @@
                     writer.close();
                 }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
-        }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
+            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+                    getOperatorId(), 0));
         }
     }
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
index a4571ee..b862e16 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
@@ -16,15 +16,14 @@
 
 import java.io.File;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -101,24 +100,8 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
index d04df29..84e30d3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
@@ -16,14 +16,13 @@
 
 import java.io.File;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -88,24 +87,9 @@
     protected abstract IRecordWriter createRecordWriter(File file, int index) throws Exception;
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index b2dc234..faa6ac4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -30,7 +31,6 @@
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 class GroupingHashTable {
     /**
@@ -60,7 +60,7 @@
     }
 
     private static final int INIT_ACCUMULATORS_SIZE = 8;
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final FrameTupleAppender appender;
     private final List<ByteBuffer> buffers;
     private final Link[] table;
@@ -79,7 +79,7 @@
 
     private final FrameTupleAccessor storedKeysAccessor;
 
-    GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
+    GroupingHashTable(IHyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
         ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
         RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
         this.ctx = ctx;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
index 664c84d..6e91c7d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
@@ -17,19 +17,18 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
 
@@ -73,24 +72,18 @@
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, rd0);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx,
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
             return new IOperatorNodePushable() {
                 private GroupingHashTable table;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory, rd0,
-                            recordDescriptors[0], tableSize);
+                    table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                            tableSize);
                 }
 
                 @Override
@@ -108,7 +101,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
             };
@@ -118,30 +111,14 @@
         public IOperatorDescriptor getOwner() {
             return HashGroupOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 
     private class OutputActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
@@ -165,7 +142,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalArgumentException();
                     }
@@ -178,15 +155,5 @@
         public IOperatorDescriptor getOwner() {
             return HashGroupOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
index e7e04a5..acfdafe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
@@ -14,15 +14,14 @@
  */
 package edu.uci.ics.hyracks.coreops.group;
 
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
 
@@ -43,29 +42,13 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         IComparator[] comparators = new IComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createComparator();
         }
         return new DeserializedOperatorNodePushable(ctx, new PreclusteredGroupOperator(groupFields, comparators,
-                aggregator), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+                aggregator), recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
index 90502b0..064743d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -18,15 +18,14 @@
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.file.IRecordReader;
@@ -142,24 +141,9 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
-    }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
index c144365..0e81de7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
@@ -22,15 +22,14 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
 import edu.uci.ics.hyracks.hadoop.util.DatatypeHelper;
@@ -133,28 +132,13 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
 
     public Class<? extends Mapper> getMapperClass() {
         return mapperClass;
     }
-
-    @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
-    }
 }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
index e7bfa31..95bfaea 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
@@ -28,18 +28,17 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
 import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.group.IGroupAggregator;
 import edu.uci.ics.hyracks.coreops.group.PreclusteredGroupOperator;
@@ -206,14 +205,8 @@
     }
 
     @Override
-    public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         try {
             if (this.comparatorFactory == null) {
                 String comparatorClassName = getJobConfMap().get(comparatorClassKey);
@@ -235,7 +228,8 @@
             }
             IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
                     new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
-            return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
+            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+                    getOperatorId(), 0));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -250,16 +244,6 @@
     }
 
     @Override
-    public boolean supportsPullInterface() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsPushInterface() {
-        return true;
-    }
-
-    @Override
     public RecordDescriptor getRecordDescriptor(JobConf conf) {
         String outputKeyClassName = conf.get("mapred.output.key.class");
         String outputValueClassName = conf.get("mapred.output.value.class");
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
index c38b9ce..c9605e6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
@@ -21,24 +21,23 @@
 import java.nio.channels.FileChannel;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.coreops.RepartitionComputerFactory;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
@@ -113,22 +112,15 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification().getOperatorInputRecordDescriptor(getOperatorId(),
-                    operatorInputIndex);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
             IOperatorNodePushable op = new IOperatorNodePushable() {
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
+                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx,
+                        recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
 
                 ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories)
                         .createPartitioner();
@@ -140,7 +132,7 @@
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
 
@@ -229,39 +221,20 @@
         public IOperatorDescriptor getOwner() {
             return GraceHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
-
     }
 
     private class JoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
 
             IOperatorNodePushable op = new IOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
@@ -357,7 +330,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalStateException();
                     }
@@ -371,15 +344,5 @@
         public IOperatorDescriptor getOwner() {
             return GraceHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
index 1462f52..82f4506 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
@@ -21,25 +21,24 @@
 import java.nio.channels.FileChannel;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.coreops.RepartitionComputerFactory;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
@@ -124,18 +123,10 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -156,7 +147,7 @@
                 private int B;
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
 
@@ -330,17 +321,6 @@
         public IOperatorDescriptor getOwner() {
             return HybridHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
-
     }
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
@@ -353,18 +333,10 @@
         }
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -575,7 +547,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalStateException();
                     }
@@ -589,15 +561,5 @@
         public IOperatorDescriptor getOwner() {
             return HybridHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
index 0962895..ab896fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
@@ -20,12 +20,12 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class InMemoryHashJoin {
     private final Link[] table;
@@ -38,7 +38,7 @@
     private final FrameTuplePairComparator tpComparator;
     private final ByteBuffer outBuffer;
 
-    public InMemoryHashJoin(HyracksContext ctx, int tableSize, FrameTupleAccessor accessor0,
+    public InMemoryHashJoin(IHyracksContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator) {
         table = new Link[tableSize];
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
index 51fc7c1..cc1cdb7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
@@ -17,23 +17,22 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
@@ -79,18 +78,10 @@
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
-            final RecordDescriptor rd0 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = plan.getJobSpecification()
-                    .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -122,7 +113,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
             };
@@ -133,30 +124,14 @@
         public IOperatorDescriptor getOwner() {
             return InMemoryHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 
     private class HashProbeActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition, int nPartitions) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             IOperatorNodePushable op = new IOperatorNodePushable() {
                 private IFrameWriter writer;
                 private InMemoryHashJoin joiner;
@@ -180,7 +155,7 @@
                 }
 
                 @Override
-                public void setFrameWriter(int index, IFrameWriter writer) {
+                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     if (index != 0) {
                         throw new IllegalStateException();
                     }
@@ -194,15 +169,5 @@
         public IOperatorDescriptor getOwner() {
             return InMemoryHashJoinOperatorDescriptor.this;
         }
-
-        @Override
-        public boolean supportsPullInterface() {
-            return false;
-        }
-
-        @Override
-        public boolean supportsPushInterface() {
-            return true;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
index 610cc51..58f6a9f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
@@ -15,43 +15,33 @@
 package edu.uci.ics.hyracks.coreops.util;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.comm.io.FrameDeserializer;
 import edu.uci.ics.hyracks.comm.io.SerializingDataWriter;
-import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 
 public final class DeserializedOperatorNodePushable implements IOperatorNodePushable {
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
 
     private final IOpenableDataWriterOperator delegate;
 
-    private final JobPlan plan;
-
-    private final ActivityNodeId aid;
-
     private final FrameDeserializer deserializer;
 
-    public DeserializedOperatorNodePushable(HyracksContext ctx, IOpenableDataWriterOperator delegate, JobPlan plan,
-            ActivityNodeId aid) {
+    public DeserializedOperatorNodePushable(IHyracksContext ctx, IOpenableDataWriterOperator delegate,
+            RecordDescriptor inRecordDesc) {
         this.ctx = ctx;
         this.delegate = delegate;
-        this.plan = plan;
-        this.aid = aid;
-        List<Integer> inList = plan.getTaskInputMap().get(aid);
-        deserializer = inList == null ? null : new FrameDeserializer(ctx, plan.getTaskInputRecordDescriptor(aid, 0));
+        deserializer = inRecordDesc == null ? null : new FrameDeserializer(ctx, inRecordDesc);
     }
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer) {
-        delegate.setDataWriter(index, new SerializingDataWriter(ctx, plan.getTaskOutputRecordDescriptor(aid, index),
-                writer));
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        delegate.setDataWriter(index, new SerializingDataWriter(ctx, recordDesc, writer));
     }
 
     @Override
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
index f64320b..5f7dc54 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
@@ -18,12 +18,12 @@
 import java.util.BitSet;
 import java.util.Comparator;
 
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class ReferencedPriorityQueue {
-    private final HyracksContext ctx;
+    private final IHyracksContext ctx;
     private final RecordDescriptor recordDescriptor;
     private final ReferenceEntry entries[];
     private final int size;
@@ -32,7 +32,7 @@
 
     private final Comparator<ReferenceEntry> comparator;
 
-    public ReferencedPriorityQueue(HyracksContext ctx, RecordDescriptor recordDescriptor, int initSize,
+    public ReferencedPriorityQueue(IHyracksContext ctx, RecordDescriptor recordDescriptor, int initSize,
             Comparator<ReferenceEntry> comparator) throws IOException {
         this.ctx = ctx;
         this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
index 38be3ff..eff017a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
@@ -18,19 +18,22 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
 
-public final class ResourceManager {
-    private final HyracksContext ctx;
+public final class ResourceManager implements IResourceManager {
+    private final IHyracksContext ctx;
 
-    public ResourceManager(HyracksContext ctx) {
+    public ResourceManager(IHyracksContext ctx) {
         this.ctx = ctx;
     }
 
+    @Override
     public ByteBuffer allocateFrame() {
         return ByteBuffer.allocate(ctx.getFrameSize());
     }
 
+    @Override
     public File createFile(String prefix, String suffix) throws IOException {
         return File.createTempFile(prefix, suffix);
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 149f867..6ea3bf1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -18,8 +18,9 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class OperatorRunnable implements Runnable {
     private IOperatorNodePushable opNode;
@@ -27,13 +28,13 @@
     private ByteBuffer buffer;
     private volatile boolean abort;
 
-    public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
+    public OperatorRunnable(IHyracksContext ctx, IOperatorNodePushable opNode) {
         this.opNode = opNode;
         buffer = ctx.getResourceManager().allocateFrame();
     }
 
-    public void setFrameWriter(int index, IFrameWriter writer) {
-        opNode.setFrameWriter(index, writer);
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        opNode.setFrameWriter(index, writer, recordDesc);
     }
 
     public void setFrameReader(IFrameReader reader) {
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 1467de5..92f53af 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -24,6 +24,7 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
@@ -40,7 +41,7 @@
     private static final String DBLP_FILE = "data/dblp.txt";
 
     private static class SerDeserRunner {
-        private final HyracksContext ctx;
+        private final IHyracksContext ctx;
         private static final int FRAME_SIZE = 32768;
         private RecordDescriptor rDes;
         private List<ByteBuffer> buffers;