Added new networking layer.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@984 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index 64a409c..8c56958 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -1,9 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-nc</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
@@ -37,6 +34,11 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-net</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d869515..d5fc27e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -65,7 +65,7 @@
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
@@ -91,7 +91,7 @@
 
     private final PartitionManager partitionManager;
 
-    private final ConnectionManager connectionManager;
+    private final NetworkManager netManager;
 
     private final WorkQueue queue;
 
@@ -131,9 +131,8 @@
         if (id == null) {
             throw new Exception("id not set");
         }
-        connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
         partitionManager = new PartitionManager(this);
-        connectionManager.setPartitionRequestListener(partitionManager);
+        netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
@@ -166,7 +165,7 @@
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
         ipc.start();
-        connectionManager.start();
+        netManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -174,9 +173,9 @@
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
-                connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
-                osMXBean.getAvailableProcessors(), hbSchema));
+        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
+                .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
+                .getAvailableProcessors(), hbSchema));
         queue.start();
 
         heartbeatTask = new HeartbeatTask(ccs);
@@ -197,7 +196,7 @@
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
         partitionManager.close();
         heartbeatTask.cancel();
-        connectionManager.stop();
+        netManager.stop();
         queue.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
@@ -218,8 +217,8 @@
         return jobletMap;
     }
 
-    public ConnectionManager getConnectionManager() {
-        return connectionManager;
+    public NetworkManager getNetworkManager() {
+        return netManager;
     }
 
     public PartitionManager getPartitionManager() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
deleted file mode 100644
index 6e38ef7..0000000
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
-
-public class ConnectionManager {
-    private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
-
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
-    private final IHyracksRootContext ctx;
-
-    private IPartitionRequestListener partitionRequestListener;
-
-    private final ServerSocketChannel serverChannel;
-
-    private volatile boolean stopped;
-
-    private final ConnectionListenerThread connectionListener;
-
-    private final DataListenerThread dataListener;
-
-    private final NetworkAddress networkAddress;
-
-    public ConnectionManager(IHyracksRootContext ctx, InetAddress inetAddress) throws IOException {
-        this.ctx = ctx;
-        serverChannel = ServerSocketChannel.open();
-        ServerSocket serverSocket = serverChannel.socket();
-        serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
-        serverSocket.setReuseAddress(true);
-        stopped = false;
-        connectionListener = new ConnectionListenerThread();
-        dataListener = new DataListenerThread();
-        networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
-
-    }
-
-    public void setPartitionRequestListener(IPartitionRequestListener partitionRequestListener) {
-        this.partitionRequestListener = partitionRequestListener;
-    }
-
-    public void start() {
-        connectionListener.start();
-        dataListener.start();
-    }
-
-    public void stop() {
-        try {
-            stopped = true;
-            serverChannel.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void connect(INetworkChannel channel) throws IOException {
-        dataListener.addOutgoingConnection(channel);
-    }
-
-    private final class ConnectionListenerThread extends Thread {
-        public ConnectionListenerThread() {
-            super("Hyracks NC Connection Listener");
-            setDaemon(true);
-            setPriority(MAX_PRIORITY);
-        }
-
-        @Override
-        public void run() {
-            while (!stopped) {
-                try {
-                    SocketChannel sc = serverChannel.accept();
-                    dataListener.addIncomingConnection(sc);
-                } catch (AsynchronousCloseException e) {
-                    // do nothing
-                    if (!stopped) {
-                        e.printStackTrace();
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private final class DataListenerThread extends Thread {
-        private Selector selector;
-
-        private final List<SocketChannel> pendingIncomingConnections;
-        private final Set<SocketChannel> pendingNegotiations;
-        private final List<INetworkChannel> pendingOutgoingConnections;
-
-        public DataListenerThread() {
-            super("Hyracks Data Listener Thread");
-            setDaemon(true);
-            try {
-                selector = Selector.open();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            pendingIncomingConnections = new ArrayList<SocketChannel>();
-            pendingNegotiations = new HashSet<SocketChannel>();
-            pendingOutgoingConnections = new ArrayList<INetworkChannel>();
-        }
-
-        synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
-            pendingIncomingConnections.add(sc);
-            selector.wakeup();
-        }
-
-        synchronized void addOutgoingConnection(INetworkChannel channel) throws IOException {
-            pendingOutgoingConnections.add(channel);
-            selector.wakeup();
-        }
-
-        @Override
-        public void run() {
-            while (!stopped) {
-                try {
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("Starting Select");
-                    }
-                    int n = selector.select();
-                    synchronized (this) {
-                        if (!pendingIncomingConnections.isEmpty()) {
-                            for (SocketChannel sc : pendingIncomingConnections) {
-                                sc.configureBlocking(false);
-                                sc.socket().setReuseAddress(true);
-                                SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
-                                ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
-                                scKey.attach(buffer);
-                                pendingNegotiations.add(sc);
-                            }
-                            pendingIncomingConnections.clear();
-                        }
-                        if (!pendingOutgoingConnections.isEmpty()) {
-                            for (INetworkChannel nc : pendingOutgoingConnections) {
-                                SocketChannel sc = SocketChannel.open();
-                                sc.configureBlocking(false);
-                                sc.socket().setReuseAddress(true);
-                                SelectionKey scKey = sc.register(selector, 0);
-                                scKey.attach(nc);
-                                nc.setSelectionKey(scKey);
-                                nc.notifyConnectionManagerRegistration();
-                            }
-                            pendingOutgoingConnections.clear();
-                        }
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine("Selector: " + n);
-                        }
-                        if (n > 0) {
-                            for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
-                                SelectionKey key = i.next();
-                                i.remove();
-                                SocketChannel sc = (SocketChannel) key.channel();
-                                if (pendingNegotiations.contains(sc)) {
-                                    if (key.isReadable()) {
-                                        ByteBuffer buffer = (ByteBuffer) key.attachment();
-                                        sc.read(buffer);
-                                        buffer.flip();
-                                        if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
-                                            PartitionId pid = readInitialMessage(buffer);
-                                            pendingNegotiations.remove(sc);
-                                            key.interestOps(0);
-                                            NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
-                                            channel.setSelectionKey(key);
-                                            key.attach(channel);
-                                            try {
-                                                partitionRequestListener.registerPartitionRequest(pid, channel);
-                                            } catch (HyracksException e) {
-                                                key.cancel();
-                                                sc.close();
-                                                channel.abort();
-                                            }
-                                        } else {
-                                            buffer.compact();
-                                        }
-                                    }
-                                } else {
-                                    INetworkChannel channel = (INetworkChannel) key.attachment();
-                                    boolean close = false;
-                                    boolean error = false;
-                                    try {
-                                        close = channel.dispatchNetworkEvent();
-                                    } catch (IOException e) {
-                                        e.printStackTrace();
-                                        error = true;
-                                    }
-                                    if (close || error) {
-                                        key.cancel();
-                                        sc.close();
-                                        if (error) {
-                                            channel.abort();
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-        private PartitionId readInitialMessage(ByteBuffer buffer) {
-            JobId jobId = new JobId(buffer.getLong());
-            ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
-            int senderIndex = buffer.getInt();
-            int receiverIndex = buffer.getInt();
-            return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
-        }
-    }
-
-    public NetworkAddress getNetworkAddress() {
-        return networkAddress;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 61cd91f..1f4f070 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hyracks.control.nc.net;
 
 import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.channels.SelectionKey;
 
 public interface INetworkChannel {
@@ -9,10 +8,6 @@
 
     public void setSelectionKey(SelectionKey key);
 
-    public SelectionKey getSelectionKey();
-
-    public SocketAddress getRemoteAddress();
-
     public void abort();
 
     public void notifyConnectionManagerRegistration() throws IOException;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 23cf514..6400288 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -14,65 +14,48 @@
  */
 package edu.uci.ics.hyracks.control.nc.net;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
-public class NetworkInputChannel implements IInputChannel, INetworkChannel {
-    private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+public class NetworkInputChannel implements IInputChannel {
+    private IHyracksRootContext ctx;
 
-    private final ConnectionManager connectionManager;
+    private final NetworkManager netManager;
 
     private final SocketAddress remoteAddress;
 
     private final PartitionId partitionId;
 
-    private final Queue<ByteBuffer> emptyQueue;
-
     private final Queue<ByteBuffer> fullQueue;
 
-    private SocketChannel socketChannel;
+    private final int nBuffers;
 
-    private SelectionKey key;
-
-    private ByteBuffer currentBuffer;
-
-    private boolean eos;
-
-    private boolean aborted;
+    private ChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 
     private Object attachment;
 
-    private ByteBuffer writeBuffer;
-
-    public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
-            SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
-        this.connectionManager = connectionManager;
+    public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
+            PartitionId partitionId, int nBuffers) {
+        this.ctx = ctx;
+        this.netManager = netManager;
         this.remoteAddress = remoteAddress;
         this.partitionId = partitionId;
-        this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        for (int i = 0; i < nBuffers; ++i) {
-            emptyQueue.add(ctx.allocateFrame());
-        }
         fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        aborted = false;
-        eos = false;
+        this.nBuffers = nBuffers;
     }
 
     @Override
@@ -96,29 +79,31 @@
     }
 
     @Override
-    public synchronized void recycleBuffer(ByteBuffer buffer) {
+    public void recycleBuffer(ByteBuffer buffer) {
         buffer.clear();
-        emptyQueue.add(buffer);
-        if (!eos && !aborted) {
-            int ops = key.interestOps();
-            if ((ops & SelectionKey.OP_READ) == 0) {
-                key.interestOps(ops | SelectionKey.OP_READ);
-                key.selector().wakeup();
-                if (currentBuffer == null) {
-                    currentBuffer = emptyQueue.poll();
-                }
-            }
-        }
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
     }
 
     @Override
     public void open() throws HyracksDataException {
-        currentBuffer = emptyQueue.poll();
         try {
-            connectionManager.connect(this);
-        } catch (IOException e) {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
             throw new HyracksDataException(e);
         }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(partitionId.getJobId().getId());
+        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getSenderIndex());
+        writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.flip();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }
 
     @Override
@@ -126,110 +111,28 @@
 
     }
 
-    @Override
-    public synchronized boolean dispatchNetworkEvent() throws IOException {
-        if (aborted) {
-            eos = true;
-            monitor.notifyFailure(this);
-            return true;
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
         }
-        if (key.isConnectable()) {
-            if (socketChannel.finishConnect()) {
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-                prepareForWrite();
-            }
-        } else if (key.isWritable()) {
-            socketChannel.write(writeBuffer);
-            if (writeBuffer.remaining() == 0) {
-                key.interestOps(SelectionKey.OP_READ);
-            }
-        } else if (key.isReadable()) {
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
-            }
-            int bytesRead = socketChannel.read(currentBuffer);
-            if (bytesRead < 0) {
-                eos = true;
-                monitor.notifyEndOfStream(this);
-                return true;
-            }
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("After read: " + currentBuffer.position() + " " + currentBuffer.limit());
-            }
-            currentBuffer.flip();
-            int dataLen = currentBuffer.remaining();
-            if (dataLen >= currentBuffer.capacity() || aborted()) {
-                if (LOGGER.isLoggable(Level.FINEST)) {
-                    LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
-                }
-                if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
-                    eos = true;
-                    monitor.notifyEndOfStream(this);
-                    return true;
-                }
-                fullQueue.add(currentBuffer);
-                currentBuffer = emptyQueue.poll();
-                if (currentBuffer == null && key.isValid()) {
-                    int ops = key.interestOps();
-                    key.interestOps(ops & ~SelectionKey.OP_READ);
-                }
-                monitor.notifyDataAvailability(this, 1);
-                return false;
-            }
-            currentBuffer.compact();
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(NetworkInputChannel.this);
         }
-        return false;
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(NetworkInputChannel.this);
+        }
     }
 
-    private void prepareForConnect() {
-        key.interestOps(SelectionKey.OP_CONNECT);
-    }
-
-    private void prepareForWrite() {
-        writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
-        writeBuffer.putLong(partitionId.getJobId().getId());
-        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
-        writeBuffer.putInt(partitionId.getSenderIndex());
-        writeBuffer.putInt(partitionId.getReceiverIndex());
-        writeBuffer.flip();
-
-        key.interestOps(SelectionKey.OP_WRITE);
-    }
-
-    @Override
-    public void setSelectionKey(SelectionKey key) {
-        this.key = key;
-        socketChannel = (SocketChannel) key.channel();
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return remoteAddress;
-    }
-
-    @Override
-    public SelectionKey getSelectionKey() {
-        return key;
-    }
-
-    public PartitionId getPartitionId() {
-        return partitionId;
-    }
-
-    public void abort() {
-        aborted = true;
-    }
-
-    public boolean aborted() {
-        return aborted;
-    }
-
-    @Override
-    public void notifyConnectionManagerRegistration() throws IOException {
-        if (socketChannel.connect(remoteAddress)) {
-            prepareForWrite();
-        } else {
-            prepareForConnect();
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..c47db54
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetworkManager {
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IHyracksRootContext ctx;
+
+    private final IPartitionRequestListener partitionRequestListener;
+
+    private final MuxDemux md;
+
+    private NetworkAddress networkAddress;
+
+    public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
+            IPartitionRequestListener partitionRequestListener) throws IOException {
+        this.ctx = ctx;
+        this.partitionRequestListener = partitionRequestListener;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return networkAddress;
+    }
+
+    public void stop() {
+
+    }
+
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        private NetworkOutputChannel noc;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            PartitionId pid = readInitialMessage(buffer);
+            noc = new NetworkOutputChannel(ctx, ccb, 5);
+            try {
+                partitionRequestListener.registerPartitionRequest(pid, noc);
+            } catch (HyracksException e) {
+                noc.abort();
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+            if (noc != null) {
+                noc.abort();
+            }
+        }
+    }
+
+    private static PartitionId readInitialMessage(ByteBuffer buffer) {
+        JobId jobId = new JobId(buffer.getLong());
+        ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+        int senderIndex = buffer.getInt();
+        int receiverIndex = buffer.getInt();
+        return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 31ce924..1abcdf6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -14,124 +14,44 @@
  */
 package edu.uci.ics.hyracks.control.nc.net;
 
-import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
 
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
-public class NetworkOutputChannel implements INetworkChannel, IFrameWriter {
-    private final IHyracksRootContext ctx;
+public class NetworkOutputChannel implements IFrameWriter {
+    private final ChannelControlBlock ccb;
 
     private final Queue<ByteBuffer> emptyQueue;
 
-    private final Queue<ByteBuffer> fullQueue;
-
-    private SelectionKey key;
-
     private boolean aborted;
 
-    private boolean eos;
-
-    private boolean eosSent;
-
-    private boolean failed;
-
-    private ByteBuffer currentBuffer;
-
-    public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
-        this.ctx = ctx;
+    public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+        this.ccb = ccb;
         emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         for (int i = 0; i < nBuffers; ++i) {
             emptyQueue.add(ctx.allocateFrame());
         }
-        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-    }
-
-    @Override
-    public synchronized boolean dispatchNetworkEvent() throws IOException {
-        if (failed || aborted) {
-            eos = true;
-            return true;
-        } else if (key.isWritable()) {
-            while (true) {
-                if (currentBuffer == null) {
-                    if (eosSent) {
-                        return true;
-                    }
-                    currentBuffer = fullQueue.poll();
-                    if (currentBuffer == null) {
-                        if (eos) {
-                            currentBuffer = emptyQueue.poll();
-                            currentBuffer.clear();
-                            currentBuffer.putInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()), 0);
-                            eosSent = true;
-                        } else {
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                            return false;
-                        }
-                    }
-                }
-                int bytesWritten = ((SocketChannel) key.channel()).write(currentBuffer);
-                if (bytesWritten < 0) {
-                    eos = true;
-                    return true;
-                }
-                if (currentBuffer.remaining() == 0) {
-                    emptyQueue.add(currentBuffer);
-                    notifyAll();
-                    currentBuffer = null;
-                    if (eosSent) {
-                        return true;
-                    }
-                } else {
-                    return false;
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public void setSelectionKey(SelectionKey key) {
-        this.key = key;
-    }
-
-    @Override
-    public SelectionKey getSelectionKey() {
-        return key;
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return ((SocketChannel) key.channel()).socket().getRemoteSocketAddress();
-    }
-
-    @Override
-    public synchronized void abort() {
-        aborted = true;
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
     @Override
     public void open() throws HyracksDataException {
-        currentBuffer = null;
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer destBuffer = null;
         synchronized (this) {
-            if (aborted) {
-                throw new HyracksDataException("Connection has been aborted");
-            }
             while (true) {
+                if (aborted) {
+                    throw new HyracksDataException("Connection has been aborted");
+                }
                 destBuffer = emptyQueue.poll();
                 if (destBuffer != null) {
                     break;
@@ -148,26 +68,34 @@
         destBuffer.clear();
         destBuffer.put(buffer);
         destBuffer.flip();
-        synchronized (this) {
-            fullQueue.add(destBuffer);
-        }
-        key.interestOps(SelectionKey.OP_WRITE);
-        key.selector().wakeup();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        failed = true;
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
     }
 
     @Override
-    public synchronized void close() throws HyracksDataException {
-        eos = true;
-        key.interestOps(SelectionKey.OP_WRITE);
-        key.selector().wakeup();
+    public void close() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }
 
-    @Override
-    public void notifyConnectionManagerRegistration() throws IOException {
+    void abort() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        synchronized (NetworkOutputChannel.this) {
+            aborted = true;
+            NetworkOutputChannel.this.notifyAll();
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            synchronized (NetworkOutputChannel.this) {
+                emptyQueue.add(buffer);
+                NetworkOutputChannel.this.notifyAll();
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bfa21c9..bd40fea 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -48,7 +48,7 @@
         Joblet ji = jobletMap.get(pid.getJobId());
         if (ji != null) {
             PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                    ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+                    ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
                             networkAddress.getPort()), pid, 1));
             ji.reportPartitionAvailability(channel);
         }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index cbbe718..3fb4d60 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -59,6 +59,7 @@
         this.system = system;
         this.networkThread = new NetworkThread();
         this.serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.socket().setReuseAddress(true);
         serverSocketChannel.configureBlocking(false);
         ServerSocket socket = serverSocketChannel.socket();
         socket.bind(socketAddress);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
similarity index 98%
rename from hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
rename to hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 1fc1f6f..24cea1d 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.ipc.tests;
+package edu.uci.ics.hyracks.ipc.tests;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
diff --git a/hyracks-net/pom.xml b/hyracks-net/pom.xml
new file mode 100644
index 0000000..12004f7
--- /dev/null
+++ b/hyracks-net/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-net</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>junit</groupId>
+  	<artifactId>junit</artifactId>
+  	<version>4.8.1</version>
+  	<scope>test</scope>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
new file mode 100644
index 0000000..2f27bf0
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+import java.nio.ByteBuffer;
+
+public interface IBufferAcceptor {
+    public void accept(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
new file mode 100644
index 0000000..c395ac9
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+    public void close();
+
+    public void error(int ecode);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
new file mode 100644
index 0000000..ecd0373
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.exceptions;
+
+public class NetException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public NetException() {
+    }
+
+    public NetException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NetException(String message) {
+        super(message);
+    }
+
+    public NetException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
new file mode 100644
index 0000000..9a7a38b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public class ChannelControlBlock {
+    private final ChannelSet cSet;
+
+    private final int channelId;
+
+    private final ReadInterface ri;
+
+    private final WriteInterface wi;
+
+    private final AtomicBoolean localClose;
+
+    private final AtomicBoolean remoteClose;
+
+    ChannelControlBlock(ChannelSet cSet, int channelId) {
+        this.cSet = cSet;
+        this.channelId = channelId;
+        this.ri = new ReadInterface();
+        this.wi = new WriteInterface();
+        localClose = new AtomicBoolean();
+        remoteClose = new AtomicBoolean();
+    }
+
+    int getChannelId() {
+        return channelId;
+    }
+
+    public IChannelReadInterface getReadInterface() {
+        return ri;
+    }
+
+    public IChannelWriteInterface getWriteInterface() {
+        return wi;
+    }
+
+    private final class ReadInterface implements IChannelReadInterface {
+        private final Queue<ByteBuffer> riEmptyQueue;
+
+        private final IBufferAcceptor eba = new IBufferAcceptor() {
+            @Override
+            public void accept(ByteBuffer buffer) {
+                int delta;
+                synchronized (ChannelControlBlock.this) {
+                    riEmptyQueue.add(buffer);
+                    delta = buffer.remaining();
+                }
+                credits.addAndGet(delta);
+                if (delta != 0) {
+                    cSet.markPendingCredits(channelId);
+                }
+            }
+        };
+
+        private ICloseableBufferAcceptor fba;
+
+        private final AtomicInteger credits;
+
+        private ByteBuffer currentReadBuffer;
+
+        ReadInterface() {
+            riEmptyQueue = new LinkedList<ByteBuffer>();
+            credits = new AtomicInteger();
+        }
+
+        @Override
+        public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
+            fba = fullBufferAcceptor;
+        }
+
+        @Override
+        public IBufferAcceptor getEmptyBufferAcceptor() {
+            return eba;
+        }
+
+        int read(SocketChannel sc, int size) throws IOException {
+            while (true) {
+                if (size <= 0) {
+                    return size;
+                }
+                if (ri.currentReadBuffer == null) {
+                    ri.currentReadBuffer = ri.riEmptyQueue.poll();
+                    assert ri.currentReadBuffer != null;
+                }
+                int rSize = Math.min(size, ri.currentReadBuffer.remaining());
+                if (rSize > 0) {
+                    ri.currentReadBuffer.limit(ri.currentReadBuffer.position() + rSize);
+                    int len;
+                    try {
+                        len = sc.read(ri.currentReadBuffer);
+                    } finally {
+                        ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
+                    }
+                    size -= len;
+                    if (len < rSize) {
+                        return size;
+                    }
+                } else {
+                    return size;
+                }
+                if (ri.currentReadBuffer.remaining() <= 0) {
+                    flush();
+                }
+            }
+        }
+
+        void flush() {
+            if (currentReadBuffer != null) {
+                currentReadBuffer.flip();
+                fba.accept(ri.currentReadBuffer);
+                currentReadBuffer = null;
+            }
+        }
+    }
+
+    private final class WriteInterface implements IChannelWriteInterface {
+        private final Queue<ByteBuffer> wiFullQueue;
+
+        private int channelWriteEventCount;
+
+        private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
+            @Override
+            public void accept(ByteBuffer buffer) {
+                synchronized (ChannelControlBlock.this) {
+                    wiFullQueue.add(buffer);
+                    incrementLocalWriteEventCount();
+                }
+            }
+
+            @Override
+            public void close() {
+                synchronized (ChannelControlBlock.this) {
+                    if (eos) {
+                        return;
+                    }
+                    eos = true;
+                    incrementLocalWriteEventCount();
+                }
+            }
+
+            @Override
+            public void error(int ecode) {
+                synchronized (ChannelControlBlock.this) {
+                    WriteInterface.this.ecode = ecode;
+                    incrementLocalWriteEventCount();
+                }
+            }
+        };
+
+        private IBufferAcceptor eba;
+
+        private final AtomicInteger credits;
+
+        private boolean eos;
+
+        private boolean eosSent;
+
+        private int ecode;
+
+        private boolean ecodeSent;
+
+        private ByteBuffer currentWriteBuffer;
+
+        WriteInterface() {
+            wiFullQueue = new LinkedList<ByteBuffer>();
+            credits = new AtomicInteger();
+            eos = false;
+            eosSent = false;
+            ecode = -1;
+            ecodeSent = false;
+        }
+
+        @Override
+        public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
+            eba = emptyBufferAcceptor;
+        }
+
+        @Override
+        public ICloseableBufferAcceptor getFullBufferAcceptor() {
+            return fba;
+        }
+
+        void write(MultiplexedConnection.WriterState writerState) {
+            if (currentWriteBuffer == null) {
+                currentWriteBuffer = wiFullQueue.poll();
+            }
+            if (currentWriteBuffer != null) {
+                int size = Math.min(currentWriteBuffer.remaining(), credits.get());
+                if (size > 0) {
+                    credits.addAndGet(-size);
+                    writerState.command.setChannelId(channelId);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
+                    writerState.command.setData(size);
+                    writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+                }
+            } else if (ecode >= 0 && !ecodeSent) {
+                decrementLocalWriteEventCount();
+                writerState.command.setChannelId(channelId);
+                writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
+                writerState.command.setData(ecode);
+                writerState.reset(null, 0, null);
+                ecodeSent = true;
+                localClose.set(true);
+            } else if (wi.eos && !wi.eosSent) {
+                decrementLocalWriteEventCount();
+                writerState.command.setChannelId(channelId);
+                writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+                writerState.command.setData(0);
+                writerState.reset(null, 0, null);
+                eosSent = true;
+                localClose.set(true);
+            }
+        }
+
+        void writeComplete() {
+            if (currentWriteBuffer.remaining() <= 0) {
+                currentWriteBuffer.clear();
+                eba.accept(currentWriteBuffer);
+                decrementLocalWriteEventCount();
+                currentWriteBuffer = null;
+            }
+        }
+
+        void incrementLocalWriteEventCount() {
+            ++channelWriteEventCount;
+            if (channelWriteEventCount == 1) {
+                cSet.markPendingWrite(channelId);
+            }
+        }
+
+        void decrementLocalWriteEventCount() {
+            --channelWriteEventCount;
+            if (channelWriteEventCount == 0) {
+                cSet.unmarkPendingWrite(channelId);
+            }
+        }
+    }
+
+    synchronized void write(MultiplexedConnection.WriterState writerState) {
+        wi.write(writerState);
+    }
+
+    synchronized void writeComplete() {
+        wi.writeComplete();
+    }
+
+    synchronized int read(SocketChannel sc, int size) throws IOException {
+        return ri.read(sc, size);
+    }
+
+    void addReadCredits(int delta) {
+        ri.credits.addAndGet(delta);
+    }
+
+    int getAndResetReadCredits() {
+        return ri.credits.getAndSet(0);
+    }
+
+    void addWriteCredits(int delta) {
+        wi.credits.addAndGet(delta);
+    }
+
+    synchronized void reportRemoteEOS() {
+        ri.flush();
+        ri.fba.close();
+        remoteClose.set(true);
+    }
+
+    synchronized void reportRemoteError(int ecode) {
+        ri.flush();
+        ri.fba.error(ecode);
+        remoteClose.set(true);
+    }
+
+    boolean completelyClosed() {
+        return localClose.get() && remoteClose.get();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
new file mode 100644
index 0000000..48f008b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+public class ChannelSet {
+    private static final int INITIAL_SIZE = 16;
+
+    private final MultiplexedConnection mConn;
+
+    private ChannelControlBlock[] ccbArray;
+
+    private final BitSet allocationBitmap;
+
+    private final BitSet pendingChannelWriteBitmap;
+
+    private final BitSet pendingChannelCreditsBitmap;
+
+    private final BitSet pendingChannelSynBitmap;
+
+    private int openChannelCount;
+
+    private final IEventCounter pendingWriteEventsCounter;
+
+    ChannelSet(MultiplexedConnection mConn, IEventCounter pendingWriteEventsCounter) {
+        this.mConn = mConn;
+        ccbArray = new ChannelControlBlock[INITIAL_SIZE];
+        allocationBitmap = new BitSet();
+        pendingChannelWriteBitmap = new BitSet();
+        pendingChannelCreditsBitmap = new BitSet();
+        pendingChannelSynBitmap = new BitSet();
+        this.pendingWriteEventsCounter = pendingWriteEventsCounter;
+        openChannelCount = 0;
+    }
+
+    ChannelControlBlock allocateChannel() {
+        synchronized (mConn) {
+            int idx = allocationBitmap.nextClearBit(0);
+            if (idx < 0) {
+                cleanupClosedChannels();
+                idx = allocationBitmap.nextClearBit(0);
+                if (idx < 0) {
+                    idx = ccbArray.length;
+                }
+            }
+            return createChannel(idx);
+        }
+    }
+
+    private void cleanupClosedChannels() {
+        for (int i = 0; i < ccbArray.length; ++i) {
+            ChannelControlBlock ccb = ccbArray[i];
+            if (ccb != null) {
+                if (ccb.completelyClosed()) {
+                    freeChannel(ccb);
+                }
+            }
+        }
+    }
+
+    ChannelControlBlock registerChannel(int channelId) {
+        return createChannel(channelId);
+    }
+
+    private void freeChannel(ChannelControlBlock channel) {
+        int idx = channel.getChannelId();
+        ccbArray[idx] = null;
+        allocationBitmap.clear(idx);
+        --openChannelCount;
+    }
+
+    ChannelControlBlock getCCB(int channelId) {
+        return ccbArray[channelId];
+    }
+
+    BitSet getPendingChannelWriteBitmap() {
+        return pendingChannelWriteBitmap;
+    }
+
+    BitSet getPendingChannelCreditsBitmap() {
+        return pendingChannelCreditsBitmap;
+    }
+
+    BitSet getPendingChannelSynBitmap() {
+        return pendingChannelSynBitmap;
+    }
+
+    int getOpenChannelCount() {
+        return openChannelCount;
+    }
+
+    void initiateChannelSyn(int channelId) {
+        synchronized (mConn) {
+            assert !pendingChannelSynBitmap.get(channelId);
+            pendingChannelSynBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
+    void markPendingCredits(int channelId) {
+        synchronized (mConn) {
+            if (!pendingChannelCreditsBitmap.get(channelId)) {
+                pendingChannelCreditsBitmap.set(channelId);
+                pendingWriteEventsCounter.increment();
+            }
+        }
+    }
+
+    void markPendingWrite(int channelId) {
+        synchronized (mConn) {
+            assert !pendingChannelWriteBitmap.get(channelId);
+            pendingChannelWriteBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
+    public void unmarkPendingWrite(int channelId) {
+        synchronized (mConn) {
+            assert pendingChannelWriteBitmap.get(channelId);
+            pendingChannelWriteBitmap.clear(channelId);
+            pendingWriteEventsCounter.decrement();
+        }
+    }
+
+    private ChannelControlBlock createChannel(int idx) {
+        if (idx >= ccbArray.length) {
+            expand(idx);
+        }
+        assert idx < ccbArray.length;
+        assert !allocationBitmap.get(idx);
+        ChannelControlBlock channel = new ChannelControlBlock(this, idx);
+        ccbArray[idx] = channel;
+        allocationBitmap.set(idx);
+        ++openChannelCount;
+        return channel;
+    }
+
+    private void expand(int idx) {
+        while (idx >= ccbArray.length) {
+            ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
new file mode 100644
index 0000000..0fc9b2a
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IChannelOpenListener {
+    public void channelOpened(ChannelControlBlock channel);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
new file mode 100644
index 0000000..468a617
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelReadInterface {
+    public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
+
+    public IBufferAcceptor getEmptyBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
new file mode 100644
index 0000000..1e53d71
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelWriteInterface {
+    public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+
+    public ICloseableBufferAcceptor getFullBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
new file mode 100644
index 0000000..148078c
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IEventCounter {
+    public void increment();
+
+    public void decrement();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
new file mode 100644
index 0000000..a99cb6a
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+
+public class MultiplexedConnection implements ITCPConnectionEventListener {
+    private final MuxDemux muxDemux;
+
+    private final IEventCounter pendingWriteEventsCounter;
+
+    private final ChannelSet cSet;
+
+    private final ReaderState readerState;
+
+    private final WriterState writerState;
+
+    private TCPConnection tcpConnection;
+
+    private int lastChannelWritten;
+
+    public MultiplexedConnection(MuxDemux muxDemux) {
+        this.muxDemux = muxDemux;
+        pendingWriteEventsCounter = new IEventCounter() {
+            private int counter;
+
+            @Override
+            public synchronized void increment() {
+                ++counter;
+                if (counter == 1) {
+                    tcpConnection.enable(SelectionKey.OP_WRITE);
+                }
+            }
+
+            @Override
+            public synchronized void decrement() {
+                --counter;
+                if (counter == 0) {
+                    tcpConnection.disable(SelectionKey.OP_WRITE);
+                }
+                if (counter < 0) {
+                    throw new IllegalStateException();
+                }
+            }
+        };
+        cSet = new ChannelSet(this, pendingWriteEventsCounter);
+        readerState = new ReaderState();
+        writerState = new WriterState();
+        lastChannelWritten = -1;
+    }
+
+    synchronized void setTCPConnection(TCPConnection tcpConnection) {
+        this.tcpConnection = tcpConnection;
+        tcpConnection.enable(SelectionKey.OP_READ);
+        notifyAll();
+    }
+
+    synchronized void waitUntilConnected() throws InterruptedException {
+        while (tcpConnection == null) {
+            wait();
+        }
+    }
+
+    @Override
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException {
+        if (readable) {
+            driveReaderStateMachine();
+        }
+        if (writable) {
+            driveWriterStateMachine();
+        }
+    }
+
+    public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+        ChannelControlBlock channel = cSet.allocateChannel();
+        int channelId = channel.getChannelId();
+        cSet.initiateChannelSyn(channelId);
+        return channel;
+    }
+
+    class WriterState {
+        private final ByteBuffer writeBuffer;
+
+        final MuxDemuxCommand command;
+
+        private ByteBuffer pendingBuffer;
+
+        private int pendingWriteSize;
+
+        private ChannelControlBlock ccb;
+
+        public WriterState() {
+            writeBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+            writeBuffer.flip();
+            command = new MuxDemuxCommand();
+            ccb = null;
+        }
+
+        boolean writePending() {
+            return writeBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+        }
+
+        void reset(ByteBuffer pendingBuffer, int pendingWriteSize, ChannelControlBlock ccb) {
+            writeBuffer.clear();
+            command.write(writeBuffer);
+            writeBuffer.flip();
+            this.pendingBuffer = pendingBuffer;
+            this.pendingWriteSize = pendingWriteSize;
+            this.ccb = ccb;
+        }
+
+        boolean performPendingWrite(SocketChannel sc) throws IOException {
+            int len = writeBuffer.remaining();
+            if (len > 0) {
+                int written = sc.write(writeBuffer);
+                if (written < len) {
+                    return false;
+                }
+            }
+            if (pendingBuffer != null) {
+                if (pendingWriteSize > 0) {
+                    assert pendingWriteSize <= pendingBuffer.remaining();
+                    int oldLimit = pendingBuffer.limit();
+                    try {
+                        pendingBuffer.limit(pendingWriteSize);
+                        int written = sc.write(pendingBuffer);
+                        pendingWriteSize -= written;
+                    } finally {
+                        pendingBuffer.limit(oldLimit);
+                    }
+                }
+                if (pendingWriteSize > 0) {
+                    return false;
+                }
+                pendingBuffer = null;
+                pendingWriteSize = 0;
+            }
+            if (ccb != null) {
+                ccb.writeComplete();
+                ccb = null;
+            }
+            return true;
+        }
+    }
+
+    void driveWriterStateMachine() throws IOException {
+        SocketChannel sc = tcpConnection.getSocketChannel();
+        if (writerState.writePending()) {
+            if (!writerState.performPendingWrite(sc)) {
+                return;
+            }
+        }
+        int numCycles;
+
+        synchronized (MultiplexedConnection.this) {
+            numCycles = cSet.getOpenChannelCount();
+        }
+
+        for (int i = 0; i < numCycles; ++i) {
+            ChannelControlBlock writeCCB = null;
+            synchronized (MultiplexedConnection.this) {
+                BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
+                for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
+                    pendingChannelSynBitmap.clear(j);
+                    pendingWriteEventsCounter.decrement();
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
+                    writerState.command.setData(0);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                }
+                BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
+                for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
+                        .nextSetBit(j)) {
+                    pendingChannelCreditsBitmap.clear(j);
+                    pendingWriteEventsCounter.decrement();
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
+                    ChannelControlBlock ccb = cSet.getCCB(j);
+                    int credits = ccb.getAndResetReadCredits();
+                    writerState.command.setData(credits);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                }
+                BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
+                lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
+                if (lastChannelWritten < 0) {
+                    lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(0);
+                    if (lastChannelWritten < 0) {
+                        return;
+                    }
+                }
+                writeCCB = cSet.getCCB(lastChannelWritten);
+            }
+            writeCCB.write(writerState);
+            if (writerState.writePending()) {
+                if (!writerState.performPendingWrite(sc)) {
+                    return;
+                }
+            }
+        }
+    }
+
+    class ReaderState {
+        private final ByteBuffer readBuffer;
+
+        final MuxDemuxCommand command;
+
+        private int pendingReadSize;
+
+        private ChannelControlBlock ccb;
+
+        ReaderState() {
+            readBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+            command = new MuxDemuxCommand();
+        }
+
+        void reset() {
+            readBuffer.clear();
+            pendingReadSize = 0;
+            ccb = null;
+        }
+    }
+
+    void driveReaderStateMachine() throws IOException {
+        SocketChannel sc = tcpConnection.getSocketChannel();
+        if (readerState.readBuffer.remaining() > 0) {
+            sc.read(readerState.readBuffer);
+            if (readerState.readBuffer.remaining() > 0) {
+                return;
+            }
+            readerState.readBuffer.flip();
+            readerState.command.read(readerState.readBuffer);
+            switch (readerState.command.getCommandType()) {
+                case ADD_CREDITS: {
+                    ChannelControlBlock ccb;
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    }
+                    ccb.addWriteCredits(readerState.command.getData());
+                    break;
+                }
+                case CLOSE_CHANNEL: {
+                    ChannelControlBlock ccb;
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    }
+                    ccb.reportRemoteEOS();
+                    break;
+                }
+                case DATA: {
+                    ChannelControlBlock ccb;
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    }
+                    readerState.pendingReadSize = readerState.command.getData();
+                    readerState.ccb = ccb;
+                    break;
+                }
+                case ERROR: {
+                    ChannelControlBlock ccb;
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    }
+                    ccb.reportRemoteError(readerState.command.getData());
+                    break;
+                }
+                case OPEN_CHANNEL: {
+                    int channelId = readerState.command.getChannelId();
+                    ChannelControlBlock ccb;
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.registerChannel(channelId);
+                    }
+                    muxDemux.getChannelOpenListener().channelOpened(ccb);
+                }
+            }
+        }
+        if (readerState.pendingReadSize > 0) {
+            readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+            if (readerState.pendingReadSize > 0) {
+                return;
+            }
+        }
+        readerState.reset();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
new file mode 100644
index 0000000..74f1be2
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
+
+public class MuxDemux {
+    private final InetSocketAddress localAddress;
+
+    private final IChannelOpenListener channelOpenListener;
+
+    private final Map<InetSocketAddress, MultiplexedConnection> connectionMap;
+
+    private final TCPEndpoint tcpEndpoint;
+
+    public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
+        this.localAddress = localAddress;
+        this.channelOpenListener = listener;
+        connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+        this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+            @Override
+            public void connectionEstablished(TCPConnection connection) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(connection.getRemoteAddress());
+                }
+                assert mConn != null;
+                mConn.setTCPConnection(connection);
+                connection.setEventListener(mConn);
+                connection.setAttachment(mConn);
+            }
+
+            @Override
+            public void acceptedConnection(TCPConnection connection) {
+                MultiplexedConnection mConn = new MultiplexedConnection(MuxDemux.this);
+                mConn.setTCPConnection(connection);
+                connection.setEventListener(mConn);
+                connection.setAttachment(mConn);
+            }
+        });
+    }
+
+    public void start() throws IOException {
+        tcpEndpoint.start(localAddress);
+    }
+
+    public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException {
+        MultiplexedConnection mConn = null;
+        synchronized (this) {
+            mConn = connectionMap.get(remoteAddress);
+            if (mConn == null) {
+                mConn = new MultiplexedConnection(this);
+                connectionMap.put(remoteAddress, mConn);
+                tcpEndpoint.initiateConnection(remoteAddress);
+            }
+        }
+        mConn.waitUntilConnected();
+        return mConn;
+    }
+
+    IChannelOpenListener getChannelOpenListener() {
+        return channelOpenListener;
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return tcpEndpoint.getLocalAddress();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
new file mode 100644
index 0000000..c32214c
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+class MuxDemuxCommand {
+    static final int COMMAND_SIZE = 4;
+
+    enum CommandType {
+        OPEN_CHANNEL,
+        CLOSE_CHANNEL,
+        ERROR,
+        ADD_CREDITS,
+        DATA,
+    }
+
+    private int channelId;
+
+    private CommandType type;
+
+    private int data;
+
+    public int getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(int channelId) {
+        this.channelId = channelId;
+    }
+
+    public CommandType getCommandType() {
+        return type;
+    }
+
+    public void setCommandType(CommandType type) {
+        this.type = type;
+    }
+
+    public int getData() {
+        return data;
+    }
+
+    public void setData(int data) {
+        this.data = data;
+    }
+
+    public void write(ByteBuffer buffer) {
+        int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
+        buffer.putInt(cmd);
+    }
+
+    public void read(ByteBuffer buffer) {
+        int cmd = buffer.getInt();
+        channelId = (cmd >> 22) & 0x3ff;
+        type = CommandType.values()[(cmd >> 19) & 0x7];
+        data = cmd & 0x7ffff;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
new file mode 100644
index 0000000..4d894bd
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+
+public interface ITCPConnectionEventListener {
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException;
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
new file mode 100644
index 0000000..cdaabf4
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+public interface ITCPConnectionListener {
+    public void acceptedConnection(TCPConnection connection);
+
+    public void connectionEstablished(TCPConnection connection);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
new file mode 100644
index 0000000..210508b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+public class TCPConnection {
+    private final TCPEndpoint endpoint;
+
+    private final SocketChannel channel;
+
+    private final SelectionKey key;
+
+    private final Selector selector;
+
+    private ITCPConnectionEventListener eventListener;
+
+    private Object attachment;
+
+    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+        this.endpoint = endpoint;
+        this.channel = channel;
+        this.key = key;
+        this.selector = selector;
+    }
+
+    public TCPEndpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public SocketChannel getSocketChannel() {
+        return channel;
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+        return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+    }
+
+    public void enable(int ops) {
+        key.interestOps(key.interestOps() | ops);
+        selector.wakeup();
+    }
+
+    public void disable(int ops) {
+        key.interestOps(key.interestOps() & ~(ops));
+        selector.wakeup();
+    }
+
+    public ITCPConnectionEventListener getEventListener() {
+        return eventListener;
+    }
+
+    public void setEventListener(ITCPConnectionEventListener eventListener) {
+        this.eventListener = eventListener;
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    public void close() {
+        key.cancel();
+        try {
+            channel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
new file mode 100644
index 0000000..5f92a11
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TCPEndpoint {
+    private final ITCPConnectionListener connectionListener;
+
+    private final List<InetSocketAddress>[] pendingConnections;
+
+    private ServerSocketChannel serverSocketChannel;
+
+    private InetSocketAddress localAddress;
+
+    private Selector selector;
+
+    private IOThread ioThread;
+
+    private int writerIndex;
+
+    private int readerIndex;
+
+    public TCPEndpoint(ITCPConnectionListener connectionListener) {
+        this.connectionListener = connectionListener;
+        this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(), new ArrayList<InetSocketAddress>() };
+        writerIndex = 0;
+        readerIndex = 1;
+    }
+
+    public void start(InetSocketAddress localAddress) throws IOException {
+        serverSocketChannel = ServerSocketChannel.open();
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(localAddress);
+        this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+        serverSocketChannel.configureBlocking(false);
+        selector = Selector.open();
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+        ioThread = new IOThread();
+        ioThread.start();
+    }
+
+    public synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+        pendingConnections[writerIndex].add(remoteAddress);
+        selector.wakeup();
+    }
+
+    private synchronized void swapReadersAndWriters() {
+        int temp = readerIndex;
+        readerIndex = writerIndex;
+        writerIndex = temp;
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    private class IOThread extends Thread {
+        public IOThread() {
+            super("TCPEndpoint IO Thread");
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    int n = selector.select();
+                    swapReadersAndWriters();
+                    if (!pendingConnections[readerIndex].isEmpty()) {
+                        for (InetSocketAddress address : pendingConnections[readerIndex]) {
+                            SocketChannel channel = SocketChannel.open();
+                            channel.configureBlocking(false);
+                            if (!channel.connect(address)) {
+                                channel.register(selector, SelectionKey.OP_CONNECT);
+                            } else {
+                                SelectionKey key = channel.register(selector, 0);
+                                createConnection(key, channel);
+                            }
+                        }
+                        pendingConnections[readerIndex].clear();
+                    }
+                    if (n > 0) {
+                        Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+                        while (i.hasNext()) {
+                            SelectionKey key = i.next();
+                            i.remove();
+                            SelectableChannel sc = key.channel();
+                            boolean readable = key.isReadable();
+                            boolean writable = key.isWritable();
+
+                            if (readable || writable) {
+                                TCPConnection connection = (TCPConnection) key.attachment();
+                                connection.getEventListener().notifyIOReady(connection, readable, writable);
+                            }
+                            if (key.isAcceptable()) {
+                                assert sc == serverSocketChannel;
+                                SocketChannel channel = serverSocketChannel.accept();
+                                channel.configureBlocking(false);
+                                SelectionKey sKey = channel.register(selector, 0);
+                                TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                                sKey.attach(connection);
+                                connectionListener.acceptedConnection(connection);
+                            } else if (key.isConnectable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                if (channel.finishConnect()) {
+                                    createConnection(key, channel);
+                                }
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        private void createConnection(SelectionKey key, SocketChannel channel) {
+            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+            key.attach(connection);
+            key.interestOps(0);
+            connectionListener.connectionEstablished(connection);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
new file mode 100644
index 0000000..6052443
--- /dev/null
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.tests;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetTest {
+    @Test
+    public void test() throws Exception {
+        MuxDemux md1 = createMuxDemux("md1");
+        md1.start();
+        InetSocketAddress md1Address = md1.getLocalAddress();
+        System.err.println("md1Address: " + md1Address);
+
+        MuxDemux md2 = createMuxDemux("md2");
+        md2.start();
+        InetSocketAddress md2Address = md2.getLocalAddress();
+        System.err.println("md2Address: " + md2Address);
+
+        System.err.println("Started");
+
+        MultiplexedConnection md1md2 = md1.connect(md2Address);
+
+        Thread t1 = createThread(md1md2, 1);
+        Thread t2 = createThread(md1md2, -1);
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+    }
+
+    private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
+        return new Thread() {
+            @Override
+            public void run() {
+                try {
+                    ChannelControlBlock md1md2c1 = md1md2.openChannel();
+
+                    final Semaphore sem = new Semaphore(1);
+                    sem.acquire();
+                    md1md2c1.getWriteInterface().setEmptyBufferAcceptor(new IBufferAcceptor() {
+                        @Override
+                        public void accept(ByteBuffer buffer) {
+                        }
+                    });
+
+                    md1md2c1.getReadInterface().setFullBufferAcceptor(new ICloseableBufferAcceptor() {
+                        @Override
+                        public void accept(ByteBuffer buffer) {
+                        }
+
+                        @Override
+                        public void error(int ecode) {
+                        }
+
+                        @Override
+                        public void close() {
+                            sem.release();
+                        }
+                    });
+
+                    ICloseableBufferAcceptor fba = md1md2c1.getWriteInterface().getFullBufferAcceptor();
+                    for (int i = 0; i < 10000; ++i) {
+                        ByteBuffer buffer = ByteBuffer.allocate(1024);
+                        for (int j = 0; j < 256; ++j) {
+                            buffer.putInt(factor * (i + j));
+                        }
+                        buffer.flip();
+                        fba.accept(buffer);
+                    }
+                    fba.close();
+                    sem.acquire();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+    }
+
+    private MuxDemux createMuxDemux(final String label) {
+        IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
+            @Override
+            public void channelOpened(final ChannelControlBlock channel) {
+                final ChannelIO cio = new ChannelIO(label, channel);
+                System.err.println(label + ": Channel Opened");
+                channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
+                channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
+
+                final IBufferAcceptor rieba = channel.getReadInterface().getEmptyBufferAcceptor();
+                for (int i = 0; i < 50; ++i) {
+                    rieba.accept(ByteBuffer.allocate(1024));
+                }
+                new Thread() {
+                    @Override
+                    public void run() {
+                        while (true) {
+                            ByteBuffer fbuf = null;
+                            synchronized (channel) {
+                                while (!cio.eos && cio.ecode == 0 && cio.rifq.isEmpty()) {
+                                    try {
+                                        channel.wait();
+                                    } catch (InterruptedException e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                                if (!cio.rifq.isEmpty()) {
+                                    fbuf = cio.rifq.poll();
+                                } else if (cio.ecode != 0) {
+                                    throw new RuntimeException("Error: " + cio.ecode);
+                                } else if (cio.eos) {
+                                    channel.getWriteInterface().getFullBufferAcceptor().close();
+                                    System.err.println("Channel Closed");
+                                    return;
+                                }
+                            }
+                            int counter = 0;
+                            while (fbuf.remaining() > 0) {
+                                counter += fbuf.getInt();
+                            }
+                            fbuf.compact();
+                            rieba.accept(fbuf);
+                            System.err.println("Received: total: " + counter);
+                        }
+                    }
+                }.start();
+            }
+        };
+        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener);
+    }
+
+    private class ChannelIO {
+        private ChannelControlBlock channel;
+
+        private Queue<ByteBuffer> rifq;
+
+        private Queue<ByteBuffer> wieq;
+
+        private boolean eos;
+
+        private int ecode;
+
+        private ICloseableBufferAcceptor rifba;
+
+        private IBufferAcceptor wieba;
+
+        public ChannelIO(final String label, ChannelControlBlock channel) {
+            this.channel = channel;
+            this.rifq = new LinkedList<ByteBuffer>();
+            this.wieq = new LinkedList<ByteBuffer>();
+
+            rifba = new ICloseableBufferAcceptor() {
+                @Override
+                public void accept(ByteBuffer buffer) {
+                    rifq.add(buffer);
+                    ChannelIO.this.channel.notifyAll();
+                }
+
+                @Override
+                public void error(int ecode) {
+                    ChannelIO.this.ecode = ecode;
+                    ChannelIO.this.channel.notifyAll();
+                }
+
+                @Override
+                public void close() {
+                    eos = true;
+                    ChannelIO.this.channel.notifyAll();
+                }
+            };
+
+            wieba = new IBufferAcceptor() {
+                @Override
+                public void accept(ByteBuffer buffer) {
+                    wieq.add(buffer);
+                    ChannelIO.this.channel.notifyAll();
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 890f42d..a68885b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
     <module>hyracks-dataflow-hadoop</module>
     <module>hyracks-control-common</module>
     <module>hyracks-control-cc</module>
+    <module>hyracks-net</module>
     <module>hyracks-control-nc</module>
     <module>hyracks-data</module>
     <module>hyracks-cli</module>