merged hyracks_asterix_stabilization r1702:1723

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1725 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index f79a464..a8f2fda 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IInputChannel {
@@ -29,7 +30,7 @@
 
     public void recycleBuffer(ByteBuffer buffer);
 
-    public void open() throws HyracksDataException;
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException;
 
     public void close() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 31c3d36..791f312 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -48,6 +48,7 @@
         ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
 
         final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+        acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index 62112f5..9cb8f10 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -19,9 +19,9 @@
 import edu.uci.ics.hyracks.api.io.IIOManager;
 
 public interface IHyracksCommonContext {
-    public ByteBuffer allocateFrame();
-
     public int getFrameSize();
 
     public IIOManager getIOManager();
+
+    public ByteBuffer allocateFrame();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index 2999648..fad4300 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -20,7 +20,7 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
 
-public interface IHyracksJobletContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
     public INCApplicationContext getApplicationContext();
 
     public JobId getJobId();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
index 2ad2e7f..a94c6de 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -14,5 +14,8 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
-public interface IHyracksRootContext extends IHyracksCommonContext {
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksRootContext {
+    public IIOManager getIOManager();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index 7285956..dd68747 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -37,6 +37,8 @@
 
     private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
 
+    private int frameSize;
+
     private int maxReattempts;
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
@@ -48,6 +50,7 @@
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
         activityMap = new HashMap<ActivityId, ActivityCluster>();
         connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+        frameSize = 32768;
     }
 
     public Map<ActivityId, ActivityCluster> getActivityMap() {
@@ -79,6 +82,14 @@
         return version;
     }
 
+    public void setFrameSize(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
     public void setMaxReattempts(int maxReattempts) {
         this.maxReattempts = maxReattempts;
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 84ebedb..cf2eec2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -56,6 +56,8 @@
 
     private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
 
+    private int frameSize;
+
     private int maxReattempts;
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
@@ -77,6 +79,7 @@
         userConstraints = new HashSet<Constraint>();
         operatorIdCounter = 0;
         connectorIdCounter = 0;
+        frameSize = 32768;
         maxReattempts = 2;
     }
 
@@ -210,6 +213,14 @@
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public void setFrameSize(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
     public void setMaxReattempts(int maxReattempts) {
         this.maxReattempts = maxReattempts;
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
index 02eb891..c3dfad0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
@@ -15,9 +15,12 @@
 package edu.uci.ics.hyracks.api.partitions;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
 
 public interface IPartition extends IDeallocatable {
+    public IHyracksTaskContext getTaskContext();
+
     public void writeTo(IFrameWriter writer);
 
     public boolean isReusable();
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 574f552..167eb4b 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,9 +37,6 @@
     @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
     public String dataIPAddress;
 
-    @Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
-    public int frameSize = 32768;
-
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
     public String ioDevices = System.getProperty("java.io.tmpdir");
 
@@ -55,6 +52,9 @@
     @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
     public int nNetThreads = 1;
 
+    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+    public int maxMemory = -1;
+
     public void toCommandLine(List<String> cList) {
         cList.add("-cc-host");
         cList.add(ccHost);
@@ -66,8 +66,6 @@
         cList.add(nodeId);
         cList.add("-data-ip-address");
         cList.add(dataIPAddress);
-        cList.add("-frame-size");
-        cList.add(String.valueOf(frameSize));
         cList.add("-iodevices");
         cList.add(ioDevices);
         cList.add("-dcache-client-servers");
@@ -80,5 +78,7 @@
         cList.add(dcacheClientPath);
         cList.add("-net-thread-count");
         cList.add(String.valueOf(nNetThreads));
+        cList.add("-max-memory");
+        cList.add(String.valueOf(maxMemory));
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 63a45db..3855b4d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -80,6 +80,8 @@
 
     private final IJobletEventListener jobletEventListener;
 
+    private final int frameSize;
+
     private JobStatus cleanupStatus;
 
     private boolean cleanupPending;
@@ -89,6 +91,7 @@
         this.nodeController = nodeController;
         this.appCtx = appCtx;
         this.jobId = jobId;
+        this.frameSize = acg.getFrameSize();
         this.acg = acg;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
         env = new OperatorEnvironmentImpl(nodeController.getId());
@@ -201,18 +204,15 @@
         });
     }
 
-    @Override
-    public ByteBuffer allocateFrame() {
-        return appCtx.getRootContext().allocateFrame();
+    ByteBuffer allocateFrame() {
+        return ByteBuffer.allocate(getFrameSize());
     }
 
-    @Override
-    public int getFrameSize() {
-        return appCtx.getRootContext().getFrameSize();
+    int getFrameSize() {
+        return frameSize;
     }
 
-    @Override
-    public IIOManager getIOManager() {
+    IIOManager getIOManager() {
         return appCtx.getRootContext().getIOManager();
     }
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ecdd839..58a173c 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,12 +129,12 @@
         NodeControllerIPCI ipci = new NodeControllerIPCI();
         ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
                 new CCNCFunctions.SerializerDeserializer());
-        this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
+        this.ctx = new RootHyracksContext(new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+        netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
@@ -260,7 +260,7 @@
         return executor;
     }
 
-    public NCConfig getConfiguration() throws Exception {
+    public NCConfig getConfiguration() {
         return ncConfig;
     }
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 257e2a8..1d5af84 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -23,7 +23,7 @@
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -33,8 +33,6 @@
 public class NetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
 
-    private IHyracksRootContext ctx;
-
     private final NetworkManager netManager;
 
     private final SocketAddress remoteAddress;
@@ -51,9 +49,8 @@
 
     private Object attachment;
 
-    public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
-            PartitionId partitionId, int nBuffers) {
-        this.ctx = ctx;
+    public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+            int nBuffers) {
         this.netManager = netManager;
         this.remoteAddress = remoteAddress;
         this.partitionId = partitionId;
@@ -88,7 +85,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
         try {
             ccb = netManager.connect(remoteAddress);
         } catch (Exception e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index d328a74..b805595 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -23,12 +23,11 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
@@ -44,18 +43,14 @@
 
     static final int INITIAL_MESSAGE_SIZE = 20;
 
-    private final IHyracksRootContext ctx;
-
-    private final IPartitionRequestListener partitionRequestListener;
+    private final PartitionManager partitionManager;
 
     private final MuxDemux md;
 
     private NetworkAddress networkAddress;
 
-    public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
-            IPartitionRequestListener partitionRequestListener, int nThreads) throws IOException {
-        this.ctx = ctx;
-        this.partitionRequestListener = partitionRequestListener;
+    public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+        this.partitionManager = partitionManager;
         md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
                 MAX_CONNECTION_ATTEMPTS);
     }
@@ -102,9 +97,9 @@
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
             }
-            noc = new NetworkOutputChannel(ctx, ccb, 5);
+            noc = new NetworkOutputChannel(ccb, 1);
             try {
-                partitionRequestListener.registerPartitionRequest(pid, noc);
+                partitionManager.registerPartitionRequest(pid, noc);
             } catch (HyracksException e) {
                 noc.abort();
             }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 185768e..9024e18 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -19,7 +19,7 @@
 import java.util.Deque;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
@@ -27,17 +27,23 @@
 public class NetworkOutputChannel implements IFrameWriter {
     private final ChannelControlBlock ccb;
 
+    private final int nBuffers;
+
     private final Deque<ByteBuffer> emptyStack;
 
     private boolean aborted;
 
-    public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+    public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
         this.ccb = ccb;
+        this.nBuffers = nBuffers;
         emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+    }
+
+    public void setTaskContext(IHyracksTaskContext ctx) {
         for (int i = 0; i < nBuffers; ++i) {
             emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
         }
-        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
     @Override
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
deleted file mode 100644
index 20ed49c..0000000
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.partitions;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-
-public interface IPartitionRequestListener {
-    public void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index e75fe1c..ae4fd2b 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -18,7 +18,7 @@
 import java.util.concurrent.Executor;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
@@ -27,7 +27,7 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 
 public class MaterializedPartition implements IPartition {
-    private final IHyracksRootContext ctx;
+    private final IHyracksTaskContext ctx;
 
     private final FileReference partitionFile;
 
@@ -35,7 +35,7 @@
 
     private final IOManager ioManager;
 
-    public MaterializedPartition(IHyracksRootContext ctx, FileReference partitionFile, Executor executor,
+    public MaterializedPartition(IHyracksTaskContext ctx, FileReference partitionFile, Executor executor,
             IOManager ioManager) {
         this.ctx = ctx;
         this.partitionFile = partitionFile;
@@ -44,6 +44,11 @@
     }
 
     @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
     public void deallocate() {
         partitionFile.delete();
     }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 0ea3124..16e31f7 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,12 +21,14 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class MaterializedPartitionInputChannel implements IInputChannel {
+    private final int nBuffers;
+
     private final Queue<ByteBuffer> emptyQueue;
 
     private final Queue<ByteBuffer> fullQueue;
@@ -41,12 +43,9 @@
 
     private Object attachment;
 
-    public MaterializedPartitionInputChannel(IHyracksRootContext ctx, int nBuffers, PartitionId pid,
-            PartitionManager manager) {
+    public MaterializedPartitionInputChannel(int nBuffers, PartitionId pid, PartitionManager manager) {
+        this.nBuffers = nBuffers;
         this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        for (int i = 0; i < nBuffers; ++i) {
-            emptyQueue.add(ctx.allocateFrame());
-        }
         fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         this.pid = pid;
         this.manager = manager;
@@ -83,7 +82,10 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+        for (int i = 0; i < nBuffers; ++i) {
+            emptyQueue.add(ctx.allocateFrame());
+        }
         IPartition partition = manager.getPartition(pid);
         partition.writeTo(writer);
     }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 1f19cbe..7bd0eb1 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -20,7 +20,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -33,7 +33,7 @@
 public class MaterializedPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
 
-    private final IHyracksRootContext ctx;
+    private final IHyracksTaskContext ctx;
 
     private final PartitionManager manager;
 
@@ -51,7 +51,7 @@
 
     private boolean failed;
 
-    public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+    public MaterializedPartitionWriter(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
             TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
         this.manager = manager;
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 08f6ea4..62320c5 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -20,7 +20,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -34,7 +34,7 @@
 public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
     private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
 
-    private final IHyracksRootContext ctx;
+    private final IHyracksTaskContext ctx;
 
     private final Executor executor;
 
@@ -56,7 +56,7 @@
 
     private boolean failed;
 
-    public MaterializingPipelinedPartition(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+    public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
             TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
         this.executor = executor;
@@ -67,6 +67,11 @@
     }
 
     @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
     public void deallocate() {
         fRef.delete();
     }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index ce40fe6..45c091a 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -21,7 +21,6 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -34,9 +33,10 @@
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
-public class PartitionManager implements IPartitionRequestListener {
+public class PartitionManager {
     private final NodeControllerService ncs;
 
     private final Map<PartitionId, List<IPartition>> partitionMap;
@@ -93,12 +93,12 @@
         }
     }
 
-    @Override
-    public synchronized void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer)
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
             throws HyracksException {
         List<IPartition> pList = partitionMap.get(partitionId);
         if (pList != null && !pList.isEmpty()) {
             IPartition partition = pList.get(0);
+            writer.setTaskContext(partition.getTaskContext());
             partition.writeTo(writer);
             if (!partition.isReusable()) {
                 partitionMap.remove(partitionId);
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 09dca9e..e427cf3 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -17,6 +17,7 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
@@ -24,6 +25,8 @@
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 
 public class PipelinedPartition implements IFrameWriter, IPartition {
+    private final IHyracksTaskContext ctx;
+
     private final PartitionManager manager;
 
     private final PartitionId pid;
@@ -36,13 +39,19 @@
 
     private boolean failed;
 
-    public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+    public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+        this.ctx = ctx;
         this.manager = manager;
         this.pid = pid;
         this.taId = taId;
     }
 
     @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
     public boolean isReusable() {
         return false;
     }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index f314e09..b1e58fc 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -35,7 +35,7 @@
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class ReceiveSideMaterializingCollector implements IPartitionCollector {
-    private final IHyracksRootContext ctx;
+    private final IHyracksTaskContext ctx;
 
     private PartitionManager manager;
 
@@ -45,7 +45,7 @@
 
     private final Executor executor;
 
-    public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
+    public ReceiveSideMaterializingCollector(IHyracksTaskContext ctx, PartitionManager manager,
             IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
         this.manager = manager;
@@ -123,7 +123,7 @@
             IInputChannel channel = pc.getInputChannel();
             try {
                 channel.registerMonitor(this);
-                channel.open();
+                channel.open(ctx);
                 mpw.open();
                 while (true) {
                     if (nAvailableFrames.get() > 0) {
@@ -150,7 +150,7 @@
                 mpw.close();
                 channel.close();
                 delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
-                        new MaterializedPartitionInputChannel(ctx, 1, pid, manager))));
+                        new MaterializedPartitionInputChannel(1, pid, manager))));
             } catch (HyracksException e) {
             }
         }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 045fa52..9e42277 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,33 +14,18 @@
  */
 package edu.uci.ics.hyracks.control.nc.runtime;
 
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 
 public class RootHyracksContext implements IHyracksRootContext {
-    private final int frameSize;
-
     private final IIOManager ioManager;
 
-    public RootHyracksContext(int frameSize, IIOManager ioManager) {
-        this.frameSize = frameSize;
+    public RootHyracksContext(IIOManager ioManager) {
         this.ioManager = ioManager;
     }
 
     @Override
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return ioManager;
     }
-
-    @Override
-    public ByteBuffer allocateFrame() {
-        return ByteBuffer.allocate(frameSize);
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 2cf43da5b..bb9669d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -46,9 +46,9 @@
             Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
             Joblet ji = jobletMap.get(pid.getJobId());
             if (ji != null) {
-                PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                        ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                                .getIpAddress()), networkAddress.getPort()), pid, 5));
+                PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
+                        new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+                                networkAddress.getPort()), pid, 5));
                 ji.reportPartitionAvailability(channel);
             }
         } catch (Exception e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 44be4b3..0c0fa3d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -187,34 +187,32 @@
         IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
                 td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(ncs.getRootContext(), ncs.getPartitionManager(), collector,
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
                     task.getTaskAttemptId(), ncs.getExecutor());
         } else {
             return collector;
         }
     }
 
-    private IPartitionWriterFactory createPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorPolicy cPolicy,
-            final JobId jobId, final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId,
-            EnumSet<JobFlag> flags) {
+    private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext ctx,
+            IConnectorPolicy cPolicy, final JobId jobId, final IConnectorDescriptor conn, final int senderIndex,
+            final TaskAttemptId taId, EnumSet<JobFlag> flags) {
         IPartitionWriterFactory factory;
         if (cPolicy.materializeOnSendSide()) {
             if (cPolicy.consumerWaitsForProducerToFinish()) {
                 factory = new IPartitionWriterFactory() {
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
-                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
-                                ncs.getExecutor());
+                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
                     }
                 };
             } else {
                 factory = new IPartitionWriterFactory() {
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
-                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
-                                ncs.getExecutor());
+                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
+                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
                     }
                 };
             }
@@ -222,7 +220,7 @@
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
                             conn.getConnectorId(), senderIndex, receiverIndex), taId);
                 }
             };
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 2ec4b47..b095332 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -17,7 +17,7 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -26,7 +26,7 @@
     private final IOpenableDataWriter<Object[]> writer;
     private final FrameDeserializer frameDeserializer;
 
-    public FrameDeserializingDataWriter(IHyracksRootContext ctx, IOpenableDataWriter<Object[]> writer,
+    public FrameDeserializingDataWriter(IHyracksTaskContext ctx, IOpenableDataWriter<Object[]> writer,
             RecordDescriptor recordDescriptor) {
         this.writer = writer;
         this.frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), recordDescriptor);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
index da5b0bc..23df4bb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
@@ -51,7 +51,7 @@
             PartitionId pid = pc.getPartitionId();
             IInputChannel channel = pc.getInputChannel();
             pa.addPartition(pid, channel);
-            channel.open();
+            channel.open(ctx);
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index d111954..0c569f2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -16,7 +16,7 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -33,7 +33,7 @@
  */
 public class BSTMemMgr implements IMemoryManager {
 
-    private final IHyracksCommonContext ctx;
+    private final IHyracksTaskContext ctx;
     public static int frameSize;
 
     private ByteBuffer[] frames;
@@ -50,7 +50,7 @@
     private Slot[] parentRes;
     private int lastFrame;
 
-    public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+    public BSTMemMgr(IHyracksTaskContext ctx, int memSize) {
         this.ctx = ctx;
         frameSize = ctx.getFrameSize();
         convertBuffer = ByteBuffer.allocate(4);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 3de9742..680b98e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -19,7 +19,7 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
@@ -31,7 +31,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class FrameSorter {
-    private final IHyracksCommonContext ctx;
+    private final IHyracksTaskContext ctx;
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
     private final IBinaryComparator[] comparators;
@@ -48,7 +48,7 @@
     private int[] tPointers;
     private int tupleCount;
 
-    public FrameSorter(IHyracksCommonContext ctx, int[] sortFields,
+    public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         this.ctx = ctx;
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
index d65952a..c5f9c2f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
@@ -448,11 +448,6 @@
         private final int FRAME_SIZE = 32768;
 
         @Override
-        public ByteBuffer allocateFrame() {
-            return ByteBuffer.allocate(FRAME_SIZE);
-        }
-
-        @Override
         public int getFrameSize() {
             return FRAME_SIZE;
         }
@@ -461,6 +456,11 @@
         public IIOManager getIOManager() {
             return null;
         }
+
+        @Override
+        public ByteBuffer allocateFrame() {
+            return ByteBuffer.allocate(FRAME_SIZE);
+        }
     }
 
     @Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index f87e532..0f59d63 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -20,10 +20,10 @@
  */
 public class AccessMethodTestsConfig {
     // Test params for RTree, LSMRTree and LSMRTreeWithAntiMatterTuples.
-    public static final int RTREE_NUM_TUPLES_TO_INSERT = 10000;
+    public static final int RTREE_NUM_TUPLES_TO_INSERT = 100;
     public static final int RTREE_NUM_INSERT_ROUNDS = 2;
     public static final int RTREE_NUM_DELETE_ROUNDS = 2;
-    public static final int RTREE_MULTITHREAD_NUM_OPERATIONS = 10000;
+    public static final int RTREE_MULTITHREAD_NUM_OPERATIONS = 100;
     public static final boolean RTREE_TEST_RSTAR_POLICY = true;
     // Test params for LSMRTree and LSMRTreeWithAntiMatterTuples.
     public static final int LSM_RTREE_BULKLOAD_ROUNDS = 5;
@@ -31,11 +31,11 @@
     public static final boolean LSM_RTREE_TEST_RSTAR_POLICY = false;
 
     // Test params for BTree, LSMBTree.
-    public static final int BTREE_NUM_TUPLES_TO_INSERT = 10000;
+    public static final int BTREE_NUM_TUPLES_TO_INSERT = 100;
     public static final int BTREE_NUM_INSERT_ROUNDS = 3;
     public static final int BTREE_NUM_DELETE_ROUNDS = 3;
     public static final int BTREE_NUM_UPDATE_ROUNDS = 3;
-    public static final int BTREE_MULTITHREAD_NUM_OPERATIONS = 10000;
+    public static final int BTREE_MULTITHREAD_NUM_OPERATIONS = 100;
     // Test params for LSMBTree only.
     public static final int LSM_BTREE_BULKLOAD_ROUNDS = 5;
     public static final int LSM_BTREE_MAX_TREES_TO_MERGE = 10;
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 7231f67..7612db9 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -29,27 +29,26 @@
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
 
 public class TestJobletContext implements IHyracksJobletContext {
+    private final int frameSize;
     private final INCApplicationContext appContext;
     private JobId jobId;
     private WorkspaceFileFactory fileFactory;
 
-    public TestJobletContext(INCApplicationContext appContext, JobId jobId) throws HyracksException {
+    public TestJobletContext(int frameSize, INCApplicationContext appContext, JobId jobId) throws HyracksException {
+        this.frameSize = frameSize;
         this.appContext = appContext;
         this.jobId = jobId;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
     }
 
-    @Override
     public ByteBuffer allocateFrame() {
-        return appContext.getRootContext().allocateFrame();
+        return ByteBuffer.allocate(frameSize);
     }
 
-    @Override
     public int getFrameSize() {
-        return appContext.getRootContext().getFrameSize();
+        return frameSize;
     }
 
-    @Override
     public IIOManager getIOManager() {
         return appContext.getRootContext().getIOManager();
     }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
index 93409ef..e195036 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.test.support;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -27,27 +26,15 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 
 public class TestRootContext implements IHyracksRootContext {
-    private int frameSize;
     private IOManager ioManager;
 
-    public TestRootContext(int frameSize) throws HyracksException {
-        this.frameSize = frameSize;
+    public TestRootContext() throws HyracksException {
         List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
         devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
         ioManager = new IOManager(devices, Executors.newCachedThreadPool());
     }
 
     @Override
-    public ByteBuffer allocateFrame() {
-        return ByteBuffer.allocate(frameSize);
-    }
-
-    @Override
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return ioManager;
     }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index e18c0b8..c122b25 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -30,11 +30,11 @@
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
 
 public class TestTaskContext implements IHyracksTaskContext {
-    private final IHyracksJobletContext jobletContext;
+    private final TestJobletContext jobletContext;
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
 
-    public TestTaskContext(IHyracksJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
+    public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
         this.jobletContext = jobletContext;
         this.taskId = taskId;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 5d488c1..71da634 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.test.support;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -28,9 +27,9 @@
 public class TestUtils {
     public static IHyracksTaskContext create(int frameSize) {
         try {
-            IHyracksRootContext rootCtx = new TestRootContext(frameSize);
+            IHyracksRootContext rootCtx = new TestRootContext();
             INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx, null);
-            IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, new JobId(0));
+            TestJobletContext jobletCtx = new TestJobletContext(frameSize, appCtx, new JobId(0));
             TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
             IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
             return taskCtx;