Merged fullstack_asterix_stabilization -r 2933:3157

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 874cbab..e163d2b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -15,8 +15,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
@@ -34,6 +35,11 @@
   		<artifactId>hyracks-net</artifactId>
   		<version>0.2.3-SNAPSHOT</version>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-comm</artifactId>
+  		<version>0.2.3-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 53a4d73..10e0dba 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -62,7 +63,9 @@
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
 import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -93,6 +96,10 @@
 
     private final NetworkManager netManager;
 
+    private final IDatasetPartitionManager datasetPartitionManager;
+
+    private final DatasetNetworkManager datasetNetworkManager;
+
     private final WorkQueue queue;
 
     private final Timer timer;
@@ -141,7 +148,11 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+        netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory);
+        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+                datasetPartitionManager, ncConfig.nNetThreads);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
@@ -212,6 +223,7 @@
 
         startApplication();
 
+        datasetNetworkManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -220,10 +232,11 @@
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
-                        .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+                datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
+                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
+                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+                runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -270,8 +283,10 @@
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
         executor.shutdownNow();
         partitionManager.close();
+        datasetPartitionManager.close();
         heartbeatTask.cancel();
         netManager.stop();
+        datasetNetworkManager.stop();
         queue.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
@@ -292,6 +307,10 @@
         return netManager;
     }
 
+    public DatasetNetworkManager getDatasetNetworkManager() {
+        return datasetNetworkManager;
+    }
+
     public PartitionManager getPartitionManager() {
         return partitionManager;
     }
@@ -316,8 +335,7 @@
         return queue;
     }
 
-    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
-        String ipaddrStr = ncConfig.dataIPAddress;
+    private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
         ipaddrStr = ipaddrStr.trim();
         Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
         Matcher m = pattern.matcher(ipaddrStr);
@@ -374,6 +392,12 @@
             hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
             hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
 
+            MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
+            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
             IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
             hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
             hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -468,4 +492,8 @@
     public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
         ccs.sendApplicationMessageToCC(data, nodeId);
     }
+
+    public IDatasetPartitionManager getDatasetPartitionManager() {
+        return datasetPartitionManager;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 310878f..d6ea111 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -348,6 +349,11 @@
     }
 
     @Override
+    public IDatasetPartitionManager getDatasetPartitionManager() {
+        return ncs.getDatasetPartitionManager();
+    }
+
+    @Override
     public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, nodeId);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
new file mode 100644
index 0000000..cecd677
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class DatasetMemoryManager {
+    private final Set<Page> availPages;
+
+    private final LeastRecentlyUsedList leastRecentlyUsedList;
+
+    private final Map<ResultSetPartitionId, PartitionNode> resultPartitionNodesMap;
+
+    private final static int FRAME_SIZE = 32768;
+
+    public DatasetMemoryManager(int availableMemory) {
+        availPages = new HashSet<Page>();
+
+        // Atleast have one page for temporarily storing the results.
+        if (availableMemory <= 0)
+            availableMemory = FRAME_SIZE;
+
+        while (availableMemory >= FRAME_SIZE) {
+            /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+             * instead of direct ByteBuffer.allocate()?
+             */
+            availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+            availableMemory -= FRAME_SIZE;
+        }
+
+        leastRecentlyUsedList = new LeastRecentlyUsedList();
+        resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
+    }
+
+    public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
+            throws OutOfMemoryError, HyracksDataException {
+        Page page;
+        if (availPages.isEmpty()) {
+            page = evictPage();
+        } else {
+            page = getAvailablePage();
+        }
+
+        page.clear();
+
+        /*
+         * It is extremely important to update the reference after obtaining the page because, in the cases where memory
+         * manager is allocated only one page of memory, the front of the LRU list should not be created by the
+         * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
+         * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
+         */
+        PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+        pn.add(page);
+        return page;
+    }
+
+    public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
+        // When a page is referenced the dataset partition writer should already be known, so we pass null.
+        updateReference(resultSetPartitionId, null);
+    }
+
+    public int getPageSize() {
+        return FRAME_SIZE;
+    }
+
+    protected void insertPartitionNode(ResultSetPartitionId resultSetPartitionId, PartitionNode pn) {
+        leastRecentlyUsedList.add(pn);
+        resultPartitionNodesMap.put(resultSetPartitionId, pn);
+    }
+
+    protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
+            IDatasetPartitionWriter dpw) {
+        PartitionNode pn = null;
+
+        if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
+            if (dpw != null) {
+                pn = new PartitionNode(resultSetPartitionId, dpw);
+                insertPartitionNode(resultSetPartitionId, pn);
+            }
+            return pn;
+        }
+        pn = resultPartitionNodesMap.get(resultSetPartitionId);
+        leastRecentlyUsedList.remove(pn);
+        insertPartitionNode(resultSetPartitionId, pn);
+
+        return pn;
+    }
+
+    protected synchronized Page evictPage() throws HyracksDataException {
+        PartitionNode pn = leastRecentlyUsedList.getFirst();
+        IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
+        Page page = dpw.returnPage();
+
+        /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
+         * away all the pages allocated to it and add to the available pages set.
+         */
+        if (page == null) {
+            availPages.addAll(pn);
+            pn.clear();
+            resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+            leastRecentlyUsedList.remove(pn);
+
+            /* Based on the assumption that if the dataset partition writer returned a null page, it should be lying about
+             * the number of pages it holds in which case we just evict all the pages it holds and should thus be able to
+             * add all those pages to available set and we have at least one page to allocate back.
+             */
+            page = getAvailablePage();
+        } else {
+            pn.remove(page);
+
+            // If the partition no more holds any pages, remove it from the linked list and the hash map.
+            if (pn.isEmpty()) {
+                resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+                leastRecentlyUsedList.remove(pn);
+            }
+        }
+
+        return page;
+    }
+
+    protected synchronized Page getAvailablePage() {
+        Iterator<Page> iter = availPages.iterator();
+        Page page = iter.next();
+        iter.remove();
+        return page;
+    }
+
+    private class LeastRecentlyUsedList {
+        private PartitionNode head;
+
+        private PartitionNode tail;
+
+        public LeastRecentlyUsedList() {
+            head = null;
+            tail = null;
+        }
+
+        public void add(PartitionNode node) {
+            if (head == null) {
+                head = tail = node;
+                return;
+            }
+            tail.setNext(node);
+            node.setPrev(tail);
+            tail = node;
+        }
+
+        public void remove(PartitionNode node) {
+            if ((node == head) && (node == tail)) {
+                head = tail = null;
+                return;
+            } else if (node == head) {
+                head = head.getNext();
+                head.setPrev(null);
+                return;
+            } else if (node == tail) {
+                tail = tail.getPrev();
+                tail.setNext(null);
+                return;
+            } else {
+                PartitionNode prev = node.getPrev();
+                PartitionNode next = node.getNext();
+                prev.setNext(next);
+                next.setPrev(prev);
+            }
+        }
+
+        public PartitionNode getFirst() {
+            return head;
+        }
+    }
+
+    private class PartitionNode extends HashSet<Page> {
+        private static final long serialVersionUID = 1L;
+
+        private final ResultSetPartitionId resultSetPartitionId;
+
+        private final IDatasetPartitionWriter datasetPartitionWriter;
+
+        private PartitionNode prev;
+
+        private PartitionNode next;
+
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+            this.resultSetPartitionId = resultSetPartitionId;
+            this.datasetPartitionWriter = datasetPartitionWriter;
+            prev = null;
+            next = null;
+        }
+
+        public ResultSetPartitionId getResultSetPartitionId() {
+            return resultSetPartitionId;
+        }
+
+        public IDatasetPartitionWriter getDatasetPartitionWriter() {
+            return datasetPartitionWriter;
+        }
+
+        public void setPrev(PartitionNode node) {
+            prev = node;
+        }
+
+        public PartitionNode getPrev() {
+            return prev;
+        }
+
+        public void setNext(PartitionNode node) {
+            next = node;
+        }
+
+        public PartitionNode getNext() {
+            return next;
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
new file mode 100644
index 0000000..1cad54b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class DatasetPartitionManager implements IDatasetPartitionManager {
+    private final NodeControllerService ncs;
+
+    private final Executor executor;
+
+    private final Map<JobId, ResultState[]> partitionResultStateMap;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    private final DatasetMemoryManager datasetMemoryManager;
+
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
+        this.ncs = ncs;
+        this.executor = executor;
+        partitionResultStateMap = new HashMap<JobId, ResultState[]>();
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+        datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+    }
+
+    @Override
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+            int partition, int nPartitions) throws HyracksException {
+        DatasetPartitionWriter dpw = null;
+        JobId jobId = ctx.getJobletContext().getJobId();
+        try {
+            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+                    nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+
+            ResultState[] resultStates = partitionResultStateMap.get(jobId);
+            if (resultStates == null) {
+                resultStates = new ResultState[nPartitions];
+                partitionResultStateMap.put(jobId, resultStates);
+            }
+            resultStates[partition] = dpw.getResultState();
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+
+        return dpw;
+    }
+
+    @Override
+    public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+        try {
+            ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    @Override
+    public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+        try {
+            ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    @Override
+    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
+            throws HyracksException {
+        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+        if (resultStates == null) {
+            throw new HyracksException("Unknown JobId " + jobId);
+        }
+
+        ResultState resultState = resultStates[partition];
+        if (resultState == null) {
+            throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+        }
+
+        IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        dpr.writeTo(writer);
+    }
+
+    @Override
+    public IWorkspaceFileFactory getFileFactory() {
+        return fileFactory;
+    }
+
+    @Override
+    public void close() {
+        deallocatableRegistry.close();
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
new file mode 100644
index 0000000..296c502
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+
+public class DatasetPartitionReader implements IDatasetPartitionReader {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
+
+    private final DatasetMemoryManager datasetMemoryManager;
+
+    private final Executor executor;
+
+    private final ResultState resultState;
+
+    private IFileHandle fileHandle;
+
+    public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+        this.datasetMemoryManager = datasetMemoryManager;
+        this.executor = executor;
+        this.resultState = resultState;
+    }
+
+    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+        long readSize = 0;
+        synchronized (resultState) {
+            while (offset >= resultState.getSize() && !resultState.getEOS()) {
+                try {
+                    resultState.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+
+        if (offset >= resultState.getSize() && resultState.getEOS()) {
+            return readSize;
+        }
+
+        if (offset < resultState.getPersistentSize()) {
+            readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
+        }
+
+        if (readSize < buffer.capacity()) {
+            long localPageOffset = offset - resultState.getPersistentSize();
+            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+            Page page = resultState.getPage(localPageIndex);
+            readSize += buffer.remaining();
+            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+        }
+
+        datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
+        return readSize;
+    }
+
+    @Override
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                NetworkOutputChannel channel = (NetworkOutputChannel) writer;
+                channel.setFrameSize(resultState.getFrameSize());
+                try {
+                    fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
+                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    channel.open();
+                    try {
+                        long offset = 0;
+                        ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
+                        while (true) {
+                            buffer.clear();
+                            long size = read(offset, buffer);
+                            if (size <= 0) {
+                                break;
+                            } else if (size < buffer.limit()) {
+                                throw new HyracksDataException("Premature end of file - readSize: " + size
+                                        + " buffer limit: " + buffer.limit());
+                            }
+                            offset += size;
+                            buffer.flip();
+                            channel.nextFrame(buffer);
+                        }
+                    } finally {
+                        channel.close();
+                        resultState.getIOManager().close(fileHandle);
+                    }
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
+                }
+            }
+        });
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
new file mode 100644
index 0000000..f6ae540
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
+
+    private static final String FILE_PREFIX = "result_";
+
+    private final IDatasetPartitionManager manager;
+
+    private final JobId jobId;
+
+    private final ResultSetId resultSetId;
+
+    private final int partition;
+
+    private final DatasetMemoryManager datasetMemoryManager;
+
+    private final ResultSetPartitionId resultSetPartitionId;
+
+    private final ResultState resultState;
+
+    private IFileHandle fileHandle;
+
+    public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
+            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+        this.manager = manager;
+        this.jobId = jobId;
+        this.resultSetId = rsId;
+        this.partition = partition;
+        this.datasetMemoryManager = datasetMemoryManager;
+
+        resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
+        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+    }
+
+    public ResultState getResultState() {
+        return resultState;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("open(" + partition + ")");
+        }
+        String fName = FILE_PREFIX + String.valueOf(partition);
+        FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
+        fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        resultState.init(fRef);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = resultState.getLastPage();
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                resultState.addPage(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            resultState.incrementSize(srcLength);
+        }
+
+        synchronized (resultState) {
+            resultState.notifyAll();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        try {
+            manager.reportPartitionFailure(jobId, resultSetId, partition);
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("close(" + partition + ")");
+        }
+
+        try {
+            synchronized (resultState) {
+                resultState.setEOS(true);
+                resultState.notifyAll();
+            }
+            manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public Page returnPage() throws HyracksDataException {
+        Page page = resultState.removePage(0);
+
+        IIOManager ioManager = resultState.getIOManager();
+
+        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+        if (page == null) {
+            ioManager.close(fileHandle);
+            return null;
+        }
+
+        page.getBuffer().flip();
+
+        long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
+        resultState.incrementPersistentSize(delta);
+        return page;
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
new file mode 100644
index 0000000..3db3fd9
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class ResultState implements IStateObject {
+    private final ResultSetPartitionId resultSetPartitionId;
+
+    private final int frameSize;
+
+    private final IIOManager ioManager;
+
+    private final AtomicBoolean eos;
+
+    private final AtomicBoolean readEOS;
+
+    private final List<Page> localPageList;
+
+    private FileReference fileRef;
+
+    private long size;
+
+    private long persistentSize;
+
+    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+        this.resultSetPartitionId = resultSetPartitionId;
+        this.ioManager = ioManager;
+        this.frameSize = frameSize;
+        eos = new AtomicBoolean(false);
+        readEOS = new AtomicBoolean(false);
+        localPageList = new ArrayList<Page>();
+    }
+
+    public synchronized void init(FileReference fileRef) {
+        this.fileRef = fileRef;
+
+        size = 0;
+        persistentSize = 0;
+        notifyAll();
+    }
+
+    public ResultSetPartitionId getResultSetPartitionId() {
+        return resultSetPartitionId;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    public IIOManager getIOManager() {
+        return ioManager;
+    }
+
+    public synchronized void incrementSize(long delta) {
+        size += delta;
+    }
+
+    public synchronized long getSize() {
+        return size;
+    }
+
+    public synchronized void incrementPersistentSize(long delta) {
+        persistentSize += delta;
+    }
+
+    public synchronized long getPersistentSize() {
+        return persistentSize;
+    }
+
+    public void setEOS(boolean eos) {
+        this.eos.set(eos);
+    }
+
+    public boolean getEOS() {
+        return eos.get();
+    }
+
+    public boolean getReadEOS() {
+        return readEOS.get();
+    }
+
+    public synchronized void addPage(Page page) {
+        localPageList.add(page);
+    }
+
+    public synchronized Page removePage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(index);
+        }
+        return page;
+    }
+
+    public synchronized Page getPage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(index);
+        }
+        return page;
+    }
+
+    public synchronized Page getLastPage() {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(localPageList.size() - 1);
+        }
+        return page;
+    }
+
+    public synchronized Page getFirstPage() {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(0);
+        }
+        return page;
+    }
+
+    public synchronized FileReference getValidFileReference() throws InterruptedException {
+        while (fileRef == null)
+            wait();
+        return fileRef;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return resultSetPartitionId.getJobId();
+    }
+
+    @Override
+    public Object getId() {
+        return resultSetPartitionId;
+    }
+
+    @Override
+    public long getMemoryOccupancy() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
new file mode 100644
index 0000000..5b8b333
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class DatasetNetworkManager implements IChannelConnectionFactory {
+    private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
+
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IDatasetPartitionManager partitionManager;
+
+    private final MuxDemux md;
+
+    private NetworkAddress networkAddress;
+
+    public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
+            throws IOException {
+        this.partitionManager = partitionManager;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return networkAddress;
+    }
+
+    public void stop() {
+
+    }
+
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        private NetworkOutputChannel noc;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            JobId jobId = new JobId(buffer.getLong());
+            int partition = buffer.getInt();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
+                        + partition + " on channel: " + ccb);
+            }
+            noc = new NetworkOutputChannel(ccb, 1);
+            try {
+                partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+            } catch (HyracksException e) {
+                noc.abort();
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+            if (noc != null) {
+                noc.abort();
+            }
+        }
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
deleted file mode 100644
index 1d5af84..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public class NetworkInputChannel implements IInputChannel {
-    private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
-
-    private final NetworkManager netManager;
-
-    private final SocketAddress remoteAddress;
-
-    private final PartitionId partitionId;
-
-    private final Queue<ByteBuffer> fullQueue;
-
-    private final int nBuffers;
-
-    private ChannelControlBlock ccb;
-
-    private IInputChannelMonitor monitor;
-
-    private Object attachment;
-
-    public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
-            int nBuffers) {
-        this.netManager = netManager;
-        this.remoteAddress = remoteAddress;
-        this.partitionId = partitionId;
-        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        this.nBuffers = nBuffers;
-    }
-
-    @Override
-    public void registerMonitor(IInputChannelMonitor monitor) {
-        this.monitor = monitor;
-    }
-
-    @Override
-    public void setAttachment(Object attachment) {
-        this.attachment = attachment;
-    }
-
-    @Override
-    public Object getAttachment() {
-        return attachment;
-    }
-
-    @Override
-    public synchronized ByteBuffer getNextBuffer() {
-        return fullQueue.poll();
-    }
-
-    @Override
-    public void recycleBuffer(ByteBuffer buffer) {
-        buffer.clear();
-        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
-    }
-
-    @Override
-    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
-        try {
-            ccb = netManager.connect(remoteAddress);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
-        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-        for (int i = 0; i < nBuffers; ++i) {
-            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
-        }
-        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
-        writeBuffer.putLong(partitionId.getJobId().getId());
-        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
-        writeBuffer.putInt(partitionId.getSenderIndex());
-        writeBuffer.putInt(partitionId.getReceiverIndex());
-        writeBuffer.flip();
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
-        }
-        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
-        ccb.getWriteInterface().getFullBufferAcceptor().close();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-
-    }
-
-    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
-        @Override
-        public void accept(ByteBuffer buffer) {
-            fullQueue.add(buffer);
-            monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
-        }
-
-        @Override
-        public void close() {
-            monitor.notifyEndOfStream(NetworkInputChannel.this);
-        }
-
-        @Override
-        public void error(int ecode) {
-            monitor.notifyFailure(NetworkInputChannel.this);
-        }
-    }
-
-    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
-        @Override
-        public void accept(ByteBuffer buffer) {
-            // do nothing
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index b805595..c8e4e94 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -36,7 +38,7 @@
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
-public class NetworkManager {
+public class NetworkManager implements IChannelConnectionFactory {
     private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
 
     private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
deleted file mode 100644
index 9024e18..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public class NetworkOutputChannel implements IFrameWriter {
-    private final ChannelControlBlock ccb;
-
-    private final int nBuffers;
-
-    private final Deque<ByteBuffer> emptyStack;
-
-    private boolean aborted;
-
-    public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
-        this.ccb = ccb;
-        this.nBuffers = nBuffers;
-        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
-        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-    }
-
-    public void setTaskContext(IHyracksTaskContext ctx) {
-        for (int i = 0; i < nBuffers; ++i) {
-            emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
-        }
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer destBuffer = null;
-        synchronized (this) {
-            while (true) {
-                if (aborted) {
-                    throw new HyracksDataException("Connection has been aborted");
-                }
-                destBuffer = emptyStack.poll();
-                if (destBuffer != null) {
-                    break;
-                }
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-        buffer.position(0);
-        buffer.limit(destBuffer.capacity());
-        destBuffer.clear();
-        destBuffer.put(buffer);
-        destBuffer.flip();
-        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().close();
-    }
-
-    void abort() {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
-        synchronized (NetworkOutputChannel.this) {
-            aborted = true;
-            NetworkOutputChannel.this.notifyAll();
-        }
-    }
-
-    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
-        @Override
-        public void accept(ByteBuffer buffer) {
-            synchronized (NetworkOutputChannel.this) {
-                emptyStack.push(buffer);
-                NetworkOutputChannel.this.notifyAll();
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 16e31f7..ba6e6c3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
     }
 
     @Override
-    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
         for (int i = 0; i < nBuffers; ++i) {
             emptyQueue.add(ctx.allocateFrame());
         }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 45c091a..ea88a75 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class PartitionManager {
@@ -98,7 +98,7 @@
         List<IPartition> pList = partitionMap.get(partitionId);
         if (pList != null && !pList.isEmpty()) {
             IPartition partition = pList.get(0);
-            writer.setTaskContext(partition.getTaskContext());
+            writer.setFrameSize(partition.getTaskContext().getFrameSize());
             partition.writeTo(writer);
             if (!partition.isReusable()) {
                 partitionMap.remove(partitionId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bb9669d..7ed9d11 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
 
 public class ReportPartitionAvailabilityWork extends AbstractWork {
     private final NodeControllerService ncs;