Added new networking layer.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@984 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index 64a409c..8c56958 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -1,9 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-nc</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
@@ -37,6 +34,11 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-net</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d869515..d5fc27e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -65,7 +65,7 @@
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
@@ -91,7 +91,7 @@
 
     private final PartitionManager partitionManager;
 
-    private final ConnectionManager connectionManager;
+    private final NetworkManager netManager;
 
     private final WorkQueue queue;
 
@@ -131,9 +131,8 @@
         if (id == null) {
             throw new Exception("id not set");
         }
-        connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
         partitionManager = new PartitionManager(this);
-        connectionManager.setPartitionRequestListener(partitionManager);
+        netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
@@ -166,7 +165,7 @@
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
         ipc.start();
-        connectionManager.start();
+        netManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -174,9 +173,9 @@
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
-                connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
-                osMXBean.getAvailableProcessors(), hbSchema));
+        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
+                .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
+                .getAvailableProcessors(), hbSchema));
         queue.start();
 
         heartbeatTask = new HeartbeatTask(ccs);
@@ -197,7 +196,7 @@
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
         partitionManager.close();
         heartbeatTask.cancel();
-        connectionManager.stop();
+        netManager.stop();
         queue.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
@@ -218,8 +217,8 @@
         return jobletMap;
     }
 
-    public ConnectionManager getConnectionManager() {
-        return connectionManager;
+    public NetworkManager getNetworkManager() {
+        return netManager;
     }
 
     public PartitionManager getPartitionManager() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
deleted file mode 100644
index 6e38ef7..0000000
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
-
-public class ConnectionManager {
-    private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
-
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
-    private final IHyracksRootContext ctx;
-
-    private IPartitionRequestListener partitionRequestListener;
-
-    private final ServerSocketChannel serverChannel;
-
-    private volatile boolean stopped;
-
-    private final ConnectionListenerThread connectionListener;
-
-    private final DataListenerThread dataListener;
-
-    private final NetworkAddress networkAddress;
-
-    public ConnectionManager(IHyracksRootContext ctx, InetAddress inetAddress) throws IOException {
-        this.ctx = ctx;
-        serverChannel = ServerSocketChannel.open();
-        ServerSocket serverSocket = serverChannel.socket();
-        serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
-        serverSocket.setReuseAddress(true);
-        stopped = false;
-        connectionListener = new ConnectionListenerThread();
-        dataListener = new DataListenerThread();
-        networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
-
-    }
-
-    public void setPartitionRequestListener(IPartitionRequestListener partitionRequestListener) {
-        this.partitionRequestListener = partitionRequestListener;
-    }
-
-    public void start() {
-        connectionListener.start();
-        dataListener.start();
-    }
-
-    public void stop() {
-        try {
-            stopped = true;
-            serverChannel.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void connect(INetworkChannel channel) throws IOException {
-        dataListener.addOutgoingConnection(channel);
-    }
-
-    private final class ConnectionListenerThread extends Thread {
-        public ConnectionListenerThread() {
-            super("Hyracks NC Connection Listener");
-            setDaemon(true);
-            setPriority(MAX_PRIORITY);
-        }
-
-        @Override
-        public void run() {
-            while (!stopped) {
-                try {
-                    SocketChannel sc = serverChannel.accept();
-                    dataListener.addIncomingConnection(sc);
-                } catch (AsynchronousCloseException e) {
-                    // do nothing
-                    if (!stopped) {
-                        e.printStackTrace();
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private final class DataListenerThread extends Thread {
-        private Selector selector;
-
-        private final List<SocketChannel> pendingIncomingConnections;
-        private final Set<SocketChannel> pendingNegotiations;
-        private final List<INetworkChannel> pendingOutgoingConnections;
-
-        public DataListenerThread() {
-            super("Hyracks Data Listener Thread");
-            setDaemon(true);
-            try {
-                selector = Selector.open();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            pendingIncomingConnections = new ArrayList<SocketChannel>();
-            pendingNegotiations = new HashSet<SocketChannel>();
-            pendingOutgoingConnections = new ArrayList<INetworkChannel>();
-        }
-
-        synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
-            pendingIncomingConnections.add(sc);
-            selector.wakeup();
-        }
-
-        synchronized void addOutgoingConnection(INetworkChannel channel) throws IOException {
-            pendingOutgoingConnections.add(channel);
-            selector.wakeup();
-        }
-
-        @Override
-        public void run() {
-            while (!stopped) {
-                try {
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("Starting Select");
-                    }
-                    int n = selector.select();
-                    synchronized (this) {
-                        if (!pendingIncomingConnections.isEmpty()) {
-                            for (SocketChannel sc : pendingIncomingConnections) {
-                                sc.configureBlocking(false);
-                                sc.socket().setReuseAddress(true);
-                                SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
-                                ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
-                                scKey.attach(buffer);
-                                pendingNegotiations.add(sc);
-                            }
-                            pendingIncomingConnections.clear();
-                        }
-                        if (!pendingOutgoingConnections.isEmpty()) {
-                            for (INetworkChannel nc : pendingOutgoingConnections) {
-                                SocketChannel sc = SocketChannel.open();
-                                sc.configureBlocking(false);
-                                sc.socket().setReuseAddress(true);
-                                SelectionKey scKey = sc.register(selector, 0);
-                                scKey.attach(nc);
-                                nc.setSelectionKey(scKey);
-                                nc.notifyConnectionManagerRegistration();
-                            }
-                            pendingOutgoingConnections.clear();
-                        }
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine("Selector: " + n);
-                        }
-                        if (n > 0) {
-                            for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
-                                SelectionKey key = i.next();
-                                i.remove();
-                                SocketChannel sc = (SocketChannel) key.channel();
-                                if (pendingNegotiations.contains(sc)) {
-                                    if (key.isReadable()) {
-                                        ByteBuffer buffer = (ByteBuffer) key.attachment();
-                                        sc.read(buffer);
-                                        buffer.flip();
-                                        if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
-                                            PartitionId pid = readInitialMessage(buffer);
-                                            pendingNegotiations.remove(sc);
-                                            key.interestOps(0);
-                                            NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
-                                            channel.setSelectionKey(key);
-                                            key.attach(channel);
-                                            try {
-                                                partitionRequestListener.registerPartitionRequest(pid, channel);
-                                            } catch (HyracksException e) {
-                                                key.cancel();
-                                                sc.close();
-                                                channel.abort();
-                                            }
-                                        } else {
-                                            buffer.compact();
-                                        }
-                                    }
-                                } else {
-                                    INetworkChannel channel = (INetworkChannel) key.attachment();
-                                    boolean close = false;
-                                    boolean error = false;
-                                    try {
-                                        close = channel.dispatchNetworkEvent();
-                                    } catch (IOException e) {
-                                        e.printStackTrace();
-                                        error = true;
-                                    }
-                                    if (close || error) {
-                                        key.cancel();
-                                        sc.close();
-                                        if (error) {
-                                            channel.abort();
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-        private PartitionId readInitialMessage(ByteBuffer buffer) {
-            JobId jobId = new JobId(buffer.getLong());
-            ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
-            int senderIndex = buffer.getInt();
-            int receiverIndex = buffer.getInt();
-            return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
-        }
-    }
-
-    public NetworkAddress getNetworkAddress() {
-        return networkAddress;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 61cd91f..1f4f070 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hyracks.control.nc.net;
 
 import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.channels.SelectionKey;
 
 public interface INetworkChannel {
@@ -9,10 +8,6 @@
 
     public void setSelectionKey(SelectionKey key);
 
-    public SelectionKey getSelectionKey();
-
-    public SocketAddress getRemoteAddress();
-
     public void abort();
 
     public void notifyConnectionManagerRegistration() throws IOException;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 23cf514..6400288 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -14,65 +14,48 @@
  */
 package edu.uci.ics.hyracks.control.nc.net;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
-public class NetworkInputChannel implements IInputChannel, INetworkChannel {
-    private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+public class NetworkInputChannel implements IInputChannel {
+    private IHyracksRootContext ctx;
 
-    private final ConnectionManager connectionManager;
+    private final NetworkManager netManager;
 
     private final SocketAddress remoteAddress;
 
     private final PartitionId partitionId;
 
-    private final Queue<ByteBuffer> emptyQueue;
-
     private final Queue<ByteBuffer> fullQueue;
 
-    private SocketChannel socketChannel;
+    private final int nBuffers;
 
-    private SelectionKey key;
-
-    private ByteBuffer currentBuffer;
-
-    private boolean eos;
-
-    private boolean aborted;
+    private ChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 
     private Object attachment;
 
-    private ByteBuffer writeBuffer;
-
-    public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
-            SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
-        this.connectionManager = connectionManager;
+    public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
+            PartitionId partitionId, int nBuffers) {
+        this.ctx = ctx;
+        this.netManager = netManager;
         this.remoteAddress = remoteAddress;
         this.partitionId = partitionId;
-        this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        for (int i = 0; i < nBuffers; ++i) {
-            emptyQueue.add(ctx.allocateFrame());
-        }
         fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        aborted = false;
-        eos = false;
+        this.nBuffers = nBuffers;
     }
 
     @Override
@@ -96,29 +79,31 @@
     }
 
     @Override
-    public synchronized void recycleBuffer(ByteBuffer buffer) {
+    public void recycleBuffer(ByteBuffer buffer) {
         buffer.clear();
-        emptyQueue.add(buffer);
-        if (!eos && !aborted) {
-            int ops = key.interestOps();
-            if ((ops & SelectionKey.OP_READ) == 0) {
-                key.interestOps(ops | SelectionKey.OP_READ);
-                key.selector().wakeup();
-                if (currentBuffer == null) {
-                    currentBuffer = emptyQueue.poll();
-                }
-            }
-        }
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
     }
 
     @Override
     public void open() throws HyracksDataException {
-        currentBuffer = emptyQueue.poll();
         try {
-            connectionManager.connect(this);
-        } catch (IOException e) {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
             throw new HyracksDataException(e);
         }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(partitionId.getJobId().getId());
+        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getSenderIndex());
+        writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.flip();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }
 
     @Override
@@ -126,110 +111,28 @@
 
     }
 
-    @Override
-    public synchronized boolean dispatchNetworkEvent() throws IOException {
-        if (aborted) {
-            eos = true;
-            monitor.notifyFailure(this);
-            return true;
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
         }
-        if (key.isConnectable()) {
-            if (socketChannel.finishConnect()) {
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-                prepareForWrite();
-            }
-        } else if (key.isWritable()) {
-            socketChannel.write(writeBuffer);
-            if (writeBuffer.remaining() == 0) {
-                key.interestOps(SelectionKey.OP_READ);
-            }
-        } else if (key.isReadable()) {
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
-            }
-            int bytesRead = socketChannel.read(currentBuffer);
-            if (bytesRead < 0) {
-                eos = true;
-                monitor.notifyEndOfStream(this);
-                return true;
-            }
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("After read: " + currentBuffer.position() + " " + currentBuffer.limit());
-            }
-            currentBuffer.flip();
-            int dataLen = currentBuffer.remaining();
-            if (dataLen >= currentBuffer.capacity() || aborted()) {
-                if (LOGGER.isLoggable(Level.FINEST)) {
-                    LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
-                }
-                if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
-                    eos = true;
-                    monitor.notifyEndOfStream(this);
-                    return true;
-                }
-                fullQueue.add(currentBuffer);
-                currentBuffer = emptyQueue.poll();
-                if (currentBuffer == null && key.isValid()) {
-                    int ops = key.interestOps();
-                    key.interestOps(ops & ~SelectionKey.OP_READ);
-                }
-                monitor.notifyDataAvailability(this, 1);
-                return false;
-            }
-            currentBuffer.compact();
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(NetworkInputChannel.this);
         }
-        return false;
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(NetworkInputChannel.this);
+        }
     }
 
-    private void prepareForConnect() {
-        key.interestOps(SelectionKey.OP_CONNECT);
-    }
-
-    private void prepareForWrite() {
-        writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
-        writeBuffer.putLong(partitionId.getJobId().getId());
-        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
-        writeBuffer.putInt(partitionId.getSenderIndex());
-        writeBuffer.putInt(partitionId.getReceiverIndex());
-        writeBuffer.flip();
-
-        key.interestOps(SelectionKey.OP_WRITE);
-    }
-
-    @Override
-    public void setSelectionKey(SelectionKey key) {
-        this.key = key;
-        socketChannel = (SocketChannel) key.channel();
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return remoteAddress;
-    }
-
-    @Override
-    public SelectionKey getSelectionKey() {
-        return key;
-    }
-
-    public PartitionId getPartitionId() {
-        return partitionId;
-    }
-
-    public void abort() {
-        aborted = true;
-    }
-
-    public boolean aborted() {
-        return aborted;
-    }
-
-    @Override
-    public void notifyConnectionManagerRegistration() throws IOException {
-        if (socketChannel.connect(remoteAddress)) {
-            prepareForWrite();
-        } else {
-            prepareForConnect();
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..c47db54
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetworkManager {
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IHyracksRootContext ctx;
+
+    private final IPartitionRequestListener partitionRequestListener;
+
+    private final MuxDemux md;
+
+    private NetworkAddress networkAddress;
+
+    public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
+            IPartitionRequestListener partitionRequestListener) throws IOException {
+        this.ctx = ctx;
+        this.partitionRequestListener = partitionRequestListener;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return networkAddress;
+    }
+
+    public void stop() {
+
+    }
+
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        private NetworkOutputChannel noc;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            PartitionId pid = readInitialMessage(buffer);
+            noc = new NetworkOutputChannel(ctx, ccb, 5);
+            try {
+                partitionRequestListener.registerPartitionRequest(pid, noc);
+            } catch (HyracksException e) {
+                noc.abort();
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+            if (noc != null) {
+                noc.abort();
+            }
+        }
+    }
+
+    private static PartitionId readInitialMessage(ByteBuffer buffer) {
+        JobId jobId = new JobId(buffer.getLong());
+        ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+        int senderIndex = buffer.getInt();
+        int receiverIndex = buffer.getInt();
+        return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 31ce924..1abcdf6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -14,124 +14,44 @@
  */
 package edu.uci.ics.hyracks.control.nc.net;
 
-import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
 
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
-public class NetworkOutputChannel implements INetworkChannel, IFrameWriter {
-    private final IHyracksRootContext ctx;
+public class NetworkOutputChannel implements IFrameWriter {
+    private final ChannelControlBlock ccb;
 
     private final Queue<ByteBuffer> emptyQueue;
 
-    private final Queue<ByteBuffer> fullQueue;
-
-    private SelectionKey key;
-
     private boolean aborted;
 
-    private boolean eos;
-
-    private boolean eosSent;
-
-    private boolean failed;
-
-    private ByteBuffer currentBuffer;
-
-    public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
-        this.ctx = ctx;
+    public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+        this.ccb = ccb;
         emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         for (int i = 0; i < nBuffers; ++i) {
             emptyQueue.add(ctx.allocateFrame());
         }
-        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-    }
-
-    @Override
-    public synchronized boolean dispatchNetworkEvent() throws IOException {
-        if (failed || aborted) {
-            eos = true;
-            return true;
-        } else if (key.isWritable()) {
-            while (true) {
-                if (currentBuffer == null) {
-                    if (eosSent) {
-                        return true;
-                    }
-                    currentBuffer = fullQueue.poll();
-                    if (currentBuffer == null) {
-                        if (eos) {
-                            currentBuffer = emptyQueue.poll();
-                            currentBuffer.clear();
-                            currentBuffer.putInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()), 0);
-                            eosSent = true;
-                        } else {
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                            return false;
-                        }
-                    }
-                }
-                int bytesWritten = ((SocketChannel) key.channel()).write(currentBuffer);
-                if (bytesWritten < 0) {
-                    eos = true;
-                    return true;
-                }
-                if (currentBuffer.remaining() == 0) {
-                    emptyQueue.add(currentBuffer);
-                    notifyAll();
-                    currentBuffer = null;
-                    if (eosSent) {
-                        return true;
-                    }
-                } else {
-                    return false;
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public void setSelectionKey(SelectionKey key) {
-        this.key = key;
-    }
-
-    @Override
-    public SelectionKey getSelectionKey() {
-        return key;
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return ((SocketChannel) key.channel()).socket().getRemoteSocketAddress();
-    }
-
-    @Override
-    public synchronized void abort() {
-        aborted = true;
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
     @Override
     public void open() throws HyracksDataException {
-        currentBuffer = null;
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer destBuffer = null;
         synchronized (this) {
-            if (aborted) {
-                throw new HyracksDataException("Connection has been aborted");
-            }
             while (true) {
+                if (aborted) {
+                    throw new HyracksDataException("Connection has been aborted");
+                }
                 destBuffer = emptyQueue.poll();
                 if (destBuffer != null) {
                     break;
@@ -148,26 +68,34 @@
         destBuffer.clear();
         destBuffer.put(buffer);
         destBuffer.flip();
-        synchronized (this) {
-            fullQueue.add(destBuffer);
-        }
-        key.interestOps(SelectionKey.OP_WRITE);
-        key.selector().wakeup();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        failed = true;
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
     }
 
     @Override
-    public synchronized void close() throws HyracksDataException {
-        eos = true;
-        key.interestOps(SelectionKey.OP_WRITE);
-        key.selector().wakeup();
+    public void close() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }
 
-    @Override
-    public void notifyConnectionManagerRegistration() throws IOException {
+    void abort() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        synchronized (NetworkOutputChannel.this) {
+            aborted = true;
+            NetworkOutputChannel.this.notifyAll();
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            synchronized (NetworkOutputChannel.this) {
+                emptyQueue.add(buffer);
+                NetworkOutputChannel.this.notifyAll();
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bfa21c9..bd40fea 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -48,7 +48,7 @@
         Joblet ji = jobletMap.get(pid.getJobId());
         if (ji != null) {
             PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                    ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+                    ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
                             networkAddress.getPort()), pid, 1));
             ji.reportPartitionAvailability(channel);
         }