Merged hyracks_dev_next -r 1287 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-net/pom.xml b/hyracks/hyracks-net/pom.xml
new file mode 100644
index 0000000..12004f7
--- /dev/null
+++ b/hyracks/hyracks-net/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-net</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>junit</groupId>
+  	<artifactId>junit</artifactId>
+  	<version>4.8.1</version>
+  	<scope>test</scope>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
new file mode 100644
index 0000000..2f27bf0
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+import java.nio.ByteBuffer;
+
+public interface IBufferAcceptor {
+    public void accept(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
new file mode 100644
index 0000000..c395ac9
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+    public void close();
+
+    public void error(int ecode);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
new file mode 100644
index 0000000..ecd0373
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.exceptions;
+
+public class NetException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public NetException() {
+    }
+
+    public NetException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NetException(String message) {
+        super(message);
+    }
+
+    public NetException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
new file mode 100644
index 0000000..4b55d4b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public class ChannelControlBlock {
+    private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
+
+    private final ChannelSet cSet;
+
+    private final int channelId;
+
+    private final ReadInterface ri;
+
+    private final WriteInterface wi;
+
+    private final AtomicBoolean localClose;
+
+    private final AtomicBoolean localCloseAck;
+
+    private final AtomicBoolean remoteClose;
+
+    ChannelControlBlock(ChannelSet cSet, int channelId) {
+        this.cSet = cSet;
+        this.channelId = channelId;
+        this.ri = new ReadInterface();
+        this.wi = new WriteInterface();
+        localClose = new AtomicBoolean();
+        localCloseAck = new AtomicBoolean();
+        remoteClose = new AtomicBoolean();
+    }
+
+    int getChannelId() {
+        return channelId;
+    }
+
+    public IChannelReadInterface getReadInterface() {
+        return ri;
+    }
+
+    public IChannelWriteInterface getWriteInterface() {
+        return wi;
+    }
+
+    private final class ReadInterface implements IChannelReadInterface {
+        private final Deque<ByteBuffer> riEmptyStack;
+
+        private final IBufferAcceptor eba = new IBufferAcceptor() {
+            @Override
+            public void accept(ByteBuffer buffer) {
+                int delta = buffer.remaining();
+                synchronized (ChannelControlBlock.this) {
+                    if (remoteClose.get()) {
+                        return;
+                    }
+                    riEmptyStack.push(buffer);
+                }
+                cSet.addPendingCredits(channelId, delta);
+            }
+        };
+
+        private ICloseableBufferAcceptor fba;
+
+        private volatile int credits;
+
+        private ByteBuffer currentReadBuffer;
+
+        ReadInterface() {
+            riEmptyStack = new ArrayDeque<ByteBuffer>();
+            credits = 0;
+        }
+
+        @Override
+        public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
+            fba = fullBufferAcceptor;
+        }
+
+        @Override
+        public IBufferAcceptor getEmptyBufferAcceptor() {
+            return eba;
+        }
+
+        int read(SocketChannel sc, int size) throws IOException, NetException {
+            while (true) {
+                if (size <= 0) {
+                    return size;
+                }
+                if (ri.currentReadBuffer == null) {
+                    ri.currentReadBuffer = ri.riEmptyStack.poll();
+                    assert ri.currentReadBuffer != null;
+                }
+                int rSize = Math.min(size, ri.currentReadBuffer.remaining());
+                if (rSize > 0) {
+                    ri.currentReadBuffer.limit(ri.currentReadBuffer.position() + rSize);
+                    int len;
+                    try {
+                        len = sc.read(ri.currentReadBuffer);
+                        if (len < 0) {
+                            throw new NetException("Socket Closed");
+                        }
+                    } finally {
+                        ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
+                    }
+                    size -= len;
+                    if (len < rSize) {
+                        return size;
+                    }
+                } else {
+                    return size;
+                }
+                if (ri.currentReadBuffer.remaining() <= 0) {
+                    flush();
+                }
+            }
+        }
+
+        void flush() {
+            if (currentReadBuffer != null) {
+                currentReadBuffer.flip();
+                fba.accept(ri.currentReadBuffer);
+                currentReadBuffer = null;
+            }
+        }
+    }
+
+    private final class WriteInterface implements IChannelWriteInterface {
+        private final Queue<ByteBuffer> wiFullQueue;
+
+        private boolean channelWritabilityState;
+
+        private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
+            @Override
+            public void accept(ByteBuffer buffer) {
+                synchronized (ChannelControlBlock.this) {
+                    wiFullQueue.add(buffer);
+                    adjustChannelWritability();
+                }
+            }
+
+            @Override
+            public void close() {
+                synchronized (ChannelControlBlock.this) {
+                    if (eos) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Received duplicate close() on channel: " + channelId);
+                        }
+                        return;
+                    }
+                    eos = true;
+                    adjustChannelWritability();
+                }
+            }
+
+            @Override
+            public void error(int ecode) {
+                synchronized (ChannelControlBlock.this) {
+                    WriteInterface.this.ecode = ecode;
+                    adjustChannelWritability();
+                }
+            }
+        };
+
+        private IBufferAcceptor eba;
+
+        private int credits;
+
+        private boolean eos;
+
+        private boolean eosSent;
+
+        private int ecode;
+
+        private boolean ecodeSent;
+
+        private ByteBuffer currentWriteBuffer;
+
+        WriteInterface() {
+            wiFullQueue = new ArrayDeque<ByteBuffer>();
+            credits = 0;
+            eos = false;
+            eosSent = false;
+            ecode = -1;
+            ecodeSent = false;
+        }
+
+        @Override
+        public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
+            eba = emptyBufferAcceptor;
+        }
+
+        @Override
+        public ICloseableBufferAcceptor getFullBufferAcceptor() {
+            return fba;
+        }
+
+        void write(MultiplexedConnection.WriterState writerState) throws NetException {
+            if (currentWriteBuffer == null) {
+                currentWriteBuffer = wiFullQueue.poll();
+            }
+            if (currentWriteBuffer != null) {
+                int size = Math.min(currentWriteBuffer.remaining(), credits);
+                if (size > 0) {
+                    credits -= size;
+                    writerState.command.setChannelId(channelId);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
+                    writerState.command.setData(size);
+                    writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+                } else {
+                    adjustChannelWritability();
+                }
+            } else if (ecode >= 0 && !ecodeSent) {
+                writerState.command.setChannelId(channelId);
+                writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
+                writerState.command.setData(ecode);
+                writerState.reset(null, 0, null);
+                ecodeSent = true;
+                localClose.set(true);
+                adjustChannelWritability();
+            } else if (wi.eos && !wi.eosSent) {
+                writerState.command.setChannelId(channelId);
+                writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+                writerState.command.setData(0);
+                writerState.reset(null, 0, null);
+                eosSent = true;
+                localClose.set(true);
+                adjustChannelWritability();
+            }
+        }
+
+        void writeComplete() {
+            if (currentWriteBuffer.remaining() <= 0) {
+                currentWriteBuffer.clear();
+                eba.accept(currentWriteBuffer);
+                currentWriteBuffer = null;
+                adjustChannelWritability();
+            }
+        }
+
+        private boolean computeWritability() {
+            boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+            if (writableDataPresent) {
+                return credits > 0;
+            }
+            if (eos && !eosSent) {
+                return true;
+            }
+            if (ecode >= 0 && !ecodeSent) {
+                return true;
+            }
+            return false;
+        }
+
+        void adjustChannelWritability() {
+            boolean writable = computeWritability();
+            if (writable) {
+                if (!channelWritabilityState) {
+                    cSet.markPendingWrite(channelId);
+                }
+            } else {
+                if (channelWritabilityState) {
+                    cSet.unmarkPendingWrite(channelId);
+                }
+            }
+            channelWritabilityState = writable;
+        }
+    }
+
+    synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
+        wi.write(writerState);
+    }
+
+    synchronized void writeComplete() {
+        wi.writeComplete();
+    }
+
+    synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
+        return ri.read(sc, size);
+    }
+
+    int getReadCredits() {
+        return ri.credits;
+    }
+
+    void setReadCredits(int credits) {
+        this.ri.credits = credits;
+    }
+
+    synchronized void addWriteCredits(int delta) {
+        wi.credits += delta;
+        wi.adjustChannelWritability();
+    }
+
+    synchronized void reportRemoteEOS() {
+        ri.flush();
+        ri.fba.close();
+        remoteClose.set(true);
+    }
+
+    boolean getRemoteEOS() {
+        return remoteClose.get();
+    }
+
+    synchronized void reportLocalEOSAck() {
+        localCloseAck.set(true);
+    }
+
+    synchronized void reportRemoteError(int ecode) {
+        ri.flush();
+        ri.fba.error(ecode);
+        remoteClose.set(true);
+    }
+
+    boolean completelyClosed() {
+        return localCloseAck.get() && remoteClose.get();
+    }
+
+    @Override
+    public String toString() {
+        return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
+                + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
new file mode 100644
index 0000000..ffdb5c6
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public class ChannelSet {
+    private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
+
+    private static final int MAX_OPEN_CHANNELS = 1024;
+
+    private static final int INITIAL_SIZE = 16;
+
+    private final MultiplexedConnection mConn;
+
+    private ChannelControlBlock[] ccbArray;
+
+    private final BitSet allocationBitmap;
+
+    private final BitSet pendingChannelWriteBitmap;
+
+    private final BitSet pendingChannelCreditsBitmap;
+
+    private final BitSet pendingChannelSynBitmap;
+
+    private final BitSet pendingEOSAckBitmap;
+
+    private int openChannelCount;
+
+    private final IEventCounter pendingWriteEventsCounter;
+
+    ChannelSet(MultiplexedConnection mConn, IEventCounter pendingWriteEventsCounter) {
+        this.mConn = mConn;
+        ccbArray = new ChannelControlBlock[INITIAL_SIZE];
+        allocationBitmap = new BitSet();
+        pendingChannelWriteBitmap = new BitSet();
+        pendingChannelCreditsBitmap = new BitSet();
+        pendingChannelSynBitmap = new BitSet();
+        pendingEOSAckBitmap = new BitSet();
+        this.pendingWriteEventsCounter = pendingWriteEventsCounter;
+        openChannelCount = 0;
+    }
+
+    ChannelControlBlock allocateChannel() throws NetException {
+        synchronized (mConn) {
+            int idx = allocationBitmap.nextClearBit(0);
+            if (idx < 0 || idx == ccbArray.length) {
+                cleanupClosedChannels();
+                idx = allocationBitmap.nextClearBit(0);
+                if (idx < 0 || idx == ccbArray.length) {
+                    idx = ccbArray.length;
+                }
+            }
+            return createChannel(idx);
+        }
+    }
+
+    private void cleanupClosedChannels() {
+        for (int i = 0; i < ccbArray.length; ++i) {
+            ChannelControlBlock ccb = ccbArray[i];
+            if (ccb != null) {
+                if (ccb.completelyClosed()) {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Cleaning free channel: " + ccb);
+                    }
+                    freeChannel(ccb);
+                }
+            }
+        }
+    }
+
+    ChannelControlBlock registerChannel(int channelId) throws NetException {
+        synchronized (mConn) {
+            return createChannel(channelId);
+        }
+    }
+
+    private void freeChannel(ChannelControlBlock channel) {
+        int idx = channel.getChannelId();
+        ccbArray[idx] = null;
+        allocationBitmap.clear(idx);
+        pendingChannelWriteBitmap.clear(idx);
+        pendingChannelCreditsBitmap.clear(idx);
+        pendingChannelSynBitmap.clear(idx);
+        pendingEOSAckBitmap.clear(idx);
+        --openChannelCount;
+    }
+
+    ChannelControlBlock getCCB(int channelId) {
+        return ccbArray[channelId];
+    }
+
+    BitSet getPendingChannelWriteBitmap() {
+        return pendingChannelWriteBitmap;
+    }
+
+    BitSet getPendingChannelCreditsBitmap() {
+        return pendingChannelCreditsBitmap;
+    }
+
+    BitSet getPendingChannelSynBitmap() {
+        return pendingChannelSynBitmap;
+    }
+
+    BitSet getPendingEOSAckBitmap() {
+        return pendingEOSAckBitmap;
+    }
+
+    int getOpenChannelCount() {
+        return openChannelCount;
+    }
+
+    void initiateChannelSyn(int channelId) {
+        synchronized (mConn) {
+            assert !pendingChannelSynBitmap.get(channelId);
+            pendingChannelSynBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
+    void addPendingCredits(int channelId, int delta) {
+        if (delta <= 0) {
+            return;
+        }
+        synchronized (mConn) {
+            ChannelControlBlock ccb = ccbArray[channelId];
+            if (ccb != null) {
+                if (ccb.getRemoteEOS()) {
+                    return;
+                }
+                int oldCredits = ccb.getReadCredits();
+                ccb.setReadCredits(oldCredits + delta);
+                if (oldCredits == 0) {
+                    assert !pendingChannelCreditsBitmap.get(channelId);
+                    pendingChannelCreditsBitmap.set(channelId);
+                    pendingWriteEventsCounter.increment();
+                }
+            }
+        }
+    }
+
+    void unmarkPendingCredits(int channelId) {
+        synchronized (mConn) {
+            if (pendingChannelCreditsBitmap.get(channelId)) {
+                pendingChannelCreditsBitmap.clear(channelId);
+                pendingWriteEventsCounter.decrement();
+            }
+        }
+    }
+
+    void markPendingWrite(int channelId) {
+        synchronized (mConn) {
+            assert !pendingChannelWriteBitmap.get(channelId);
+            pendingChannelWriteBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
+    void unmarkPendingWrite(int channelId) {
+        synchronized (mConn) {
+            assert pendingChannelWriteBitmap.get(channelId);
+            pendingChannelWriteBitmap.clear(channelId);
+            pendingWriteEventsCounter.decrement();
+        }
+    }
+
+    void markEOSAck(int channelId) {
+        synchronized (mConn) {
+            assert !pendingEOSAckBitmap.get(channelId);
+            pendingEOSAckBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
+    private ChannelControlBlock createChannel(int idx) throws NetException {
+        if (idx >= ccbArray.length) {
+            expand(idx);
+        }
+        if (idx > MAX_OPEN_CHANNELS) {
+            throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
+        }
+        if (ccbArray[idx] != null) {
+            assert ccbArray[idx].completelyClosed();
+            if (ccbArray[idx].completelyClosed()) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
+                }
+                freeChannel(ccbArray[idx]);
+            }
+        }
+        assert idx < ccbArray.length;
+        assert !allocationBitmap.get(idx);
+        ChannelControlBlock channel = new ChannelControlBlock(this, idx);
+        ccbArray[idx] = channel;
+        allocationBitmap.set(idx);
+        ++openChannelCount;
+        return channel;
+    }
+
+    private void expand(int idx) {
+        while (idx >= ccbArray.length) {
+            ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
new file mode 100644
index 0000000..0fc9b2a
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IChannelOpenListener {
+    public void channelOpened(ChannelControlBlock channel);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
new file mode 100644
index 0000000..468a617
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelReadInterface {
+    public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
+
+    public IBufferAcceptor getEmptyBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
new file mode 100644
index 0000000..1e53d71
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelWriteInterface {
+    public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+
+    public ICloseableBufferAcceptor getFullBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
new file mode 100644
index 0000000..148078c
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IEventCounter {
+    public void increment();
+
+    public void decrement();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
new file mode 100644
index 0000000..c905f57
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+
+public class MultiplexedConnection implements ITCPConnectionEventListener {
+    private static final Logger LOGGER = Logger.getLogger(MultiplexedConnection.class.getName());
+
+    private static final int MAX_CHUNKS_READ_PER_CYCLE = 4;
+
+    private final MuxDemux muxDemux;
+
+    private final IEventCounter pendingWriteEventsCounter;
+
+    private final ChannelSet cSet;
+
+    private final ReaderState readerState;
+
+    private final WriterState writerState;
+
+    private TCPConnection tcpConnection;
+
+    private int lastChannelWritten;
+
+    private int nConnectionAttempts;
+
+    private boolean connectionFailure;
+
+    public MultiplexedConnection(MuxDemux muxDemux) {
+        this.muxDemux = muxDemux;
+        pendingWriteEventsCounter = new IEventCounter() {
+            private int counter;
+
+            @Override
+            public synchronized void increment() {
+                ++counter;
+                if (counter == 1) {
+                    tcpConnection.enable(SelectionKey.OP_WRITE);
+                }
+            }
+
+            @Override
+            public synchronized void decrement() {
+                --counter;
+                if (counter == 0) {
+                    tcpConnection.disable(SelectionKey.OP_WRITE);
+                }
+                if (counter < 0) {
+                    throw new IllegalStateException();
+                }
+            }
+        };
+        cSet = new ChannelSet(this, pendingWriteEventsCounter);
+        readerState = new ReaderState();
+        writerState = new WriterState();
+        lastChannelWritten = -1;
+        connectionFailure = false;
+    }
+
+    int getConnectionAttempts() {
+        return nConnectionAttempts;
+    }
+
+    void setConnectionAttempts(int nConnectionAttempts) {
+        this.nConnectionAttempts = nConnectionAttempts;
+    }
+
+    synchronized void setTCPConnection(TCPConnection tcpConnection) {
+        this.tcpConnection = tcpConnection;
+        tcpConnection.enable(SelectionKey.OP_READ);
+        notifyAll();
+    }
+
+    synchronized void setConnectionFailure() {
+        this.connectionFailure = true;
+        notifyAll();
+    }
+
+    synchronized void waitUntilConnected() throws InterruptedException, NetException {
+        while (tcpConnection == null && !connectionFailure) {
+            wait();
+        }
+        if (connectionFailure) {
+            throw new NetException("Connection failure");
+        }
+    }
+
+    @Override
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+            NetException {
+        if (readable) {
+            driveReaderStateMachine();
+        }
+        if (writable) {
+            driveWriterStateMachine();
+        }
+    }
+
+    public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+        ChannelControlBlock channel = cSet.allocateChannel();
+        int channelId = channel.getChannelId();
+        cSet.initiateChannelSyn(channelId);
+        return channel;
+    }
+
+    class WriterState {
+        private final ByteBuffer writeBuffer;
+
+        final MuxDemuxCommand command;
+
+        private ByteBuffer pendingBuffer;
+
+        private int pendingWriteSize;
+
+        private ChannelControlBlock ccb;
+
+        public WriterState() {
+            writeBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
+            writeBuffer.flip();
+            command = new MuxDemuxCommand();
+            ccb = null;
+        }
+
+        boolean writePending() {
+            return writeBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+        }
+
+        void reset(ByteBuffer pendingBuffer, int pendingWriteSize, ChannelControlBlock ccb) {
+            writeBuffer.clear();
+            command.write(writeBuffer);
+            writeBuffer.flip();
+            this.pendingBuffer = pendingBuffer;
+            this.pendingWriteSize = pendingWriteSize;
+            this.ccb = ccb;
+        }
+
+        boolean performPendingWrite(SocketChannel sc) throws IOException {
+            int len = writeBuffer.remaining();
+            if (len > 0) {
+                int written = sc.write(writeBuffer);
+                muxDemux.getPerformanceCounters().addSignalingBytesWritten(written);
+                if (written < len) {
+                    return false;
+                }
+            }
+            if (pendingBuffer != null) {
+                if (pendingWriteSize > 0) {
+                    assert pendingWriteSize <= pendingBuffer.remaining();
+                    int oldLimit = pendingBuffer.limit();
+                    try {
+                        pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
+                        int written = sc.write(pendingBuffer);
+                        muxDemux.getPerformanceCounters().addPayloadBytesWritten(written);
+                        pendingWriteSize -= written;
+                    } finally {
+                        pendingBuffer.limit(oldLimit);
+                    }
+                }
+                if (pendingWriteSize > 0) {
+                    return false;
+                }
+                pendingBuffer = null;
+                pendingWriteSize = 0;
+            }
+            if (ccb != null) {
+                ccb.writeComplete();
+                ccb = null;
+            }
+            return true;
+        }
+    }
+
+    void driveWriterStateMachine() throws IOException, NetException {
+        SocketChannel sc = tcpConnection.getSocketChannel();
+        if (writerState.writePending()) {
+            if (!writerState.performPendingWrite(sc)) {
+                return;
+            }
+            pendingWriteEventsCounter.decrement();
+        }
+        int numCycles;
+
+        synchronized (MultiplexedConnection.this) {
+            numCycles = cSet.getOpenChannelCount();
+        }
+
+        for (int i = 0; i < numCycles; ++i) {
+            ChannelControlBlock writeCCB = null;
+            synchronized (MultiplexedConnection.this) {
+                BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
+                for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
+                    pendingChannelSynBitmap.clear(j);
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
+                    writerState.command.setData(0);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                    pendingWriteEventsCounter.decrement();
+                }
+                BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
+                for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
+                        .nextSetBit(j)) {
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
+                    ChannelControlBlock ccb = cSet.getCCB(j);
+                    int credits = ccb.getReadCredits();
+                    int effectiveCredits;
+                    if (credits <= MuxDemuxCommand.MAX_DATA_VALUE) {
+                        effectiveCredits = credits;
+                        ccb.setReadCredits(0);
+                        pendingChannelCreditsBitmap.clear(j);
+                    } else {
+                        effectiveCredits = MuxDemuxCommand.MAX_DATA_VALUE;
+                        ccb.setReadCredits(credits - effectiveCredits);
+                    }
+                    writerState.command.setData(effectiveCredits);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                    if (credits == effectiveCredits) {
+                        pendingWriteEventsCounter.decrement();
+                    }
+                }
+                BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
+                for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
+                    pendingEOSAckBitmap.clear(j);
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
+                    writerState.command.setData(0);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                    pendingWriteEventsCounter.decrement();
+                }
+                BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
+                lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
+                if (lastChannelWritten < 0) {
+                    lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(0);
+                    if (lastChannelWritten < 0) {
+                        return;
+                    }
+                }
+                writeCCB = cSet.getCCB(lastChannelWritten);
+            }
+            writeCCB.write(writerState);
+            if (writerState.writePending()) {
+                pendingWriteEventsCounter.increment();
+                if (!writerState.performPendingWrite(sc)) {
+                    return;
+                }
+                pendingWriteEventsCounter.decrement();
+            }
+        }
+    }
+
+    class ReaderState {
+        private final ByteBuffer readBuffer;
+
+        final MuxDemuxCommand command;
+
+        private int pendingReadSize;
+
+        private ChannelControlBlock ccb;
+
+        ReaderState() {
+            readBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
+            command = new MuxDemuxCommand();
+        }
+
+        void reset() {
+            readBuffer.clear();
+            pendingReadSize = 0;
+            ccb = null;
+        }
+
+        private ChannelControlBlock getCCBInCommand() {
+            synchronized (MultiplexedConnection.this) {
+                return cSet.getCCB(command.getChannelId());
+            }
+        }
+    }
+
+    void driveReaderStateMachine() throws IOException, NetException {
+        SocketChannel sc = tcpConnection.getSocketChannel();
+        int chunksRead = 0;
+        while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+            if (readerState.readBuffer.remaining() > 0) {
+                int read = sc.read(readerState.readBuffer);
+                if (read < 0) {
+                    throw new NetException("Socket Closed");
+                }
+                muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+                if (readerState.readBuffer.remaining() > 0) {
+                    return;
+                }
+                readerState.readBuffer.flip();
+                readerState.command.read(readerState.readBuffer);
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Received command: " + readerState.command);
+                }
+                ChannelControlBlock ccb = null;
+                switch (readerState.command.getCommandType()) {
+                    case ADD_CREDITS: {
+                        ccb = readerState.getCCBInCommand();
+                        ccb.addWriteCredits(readerState.command.getData());
+                        break;
+                    }
+                    case CLOSE_CHANNEL: {
+                        ccb = readerState.getCCBInCommand();
+                        ccb.reportRemoteEOS();
+                        int channelId = ccb.getChannelId();
+                        cSet.markEOSAck(channelId);
+                        cSet.unmarkPendingCredits(channelId);
+                        break;
+                    }
+                    case CLOSE_CHANNEL_ACK: {
+                        ccb = readerState.getCCBInCommand();
+                        ccb.reportLocalEOSAck();
+                        break;
+                    }
+                    case DATA: {
+                        ccb = readerState.getCCBInCommand();
+                        readerState.pendingReadSize = readerState.command.getData();
+                        readerState.ccb = ccb;
+                        break;
+                    }
+                    case ERROR: {
+                        ccb = readerState.getCCBInCommand();
+                        ccb.reportRemoteError(readerState.command.getData());
+                        int channelId = ccb.getChannelId();
+                        cSet.markEOSAck(channelId);
+                        cSet.unmarkPendingCredits(channelId);
+                        break;
+                    }
+                    case OPEN_CHANNEL: {
+                        int channelId = readerState.command.getChannelId();
+                        ccb = cSet.registerChannel(channelId);
+                        muxDemux.getChannelOpenListener().channelOpened(ccb);
+                    }
+                }
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
+                }
+            }
+            if (readerState.pendingReadSize > 0) {
+                ++chunksRead;
+                int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+                muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
+                readerState.pendingReadSize = newPendingReadSize;
+                if (readerState.pendingReadSize > 0) {
+                    return;
+                }
+            }
+            readerState.reset();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
new file mode 100644
index 0000000..8548bb8
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
+
+public class MuxDemux {
+    private final InetSocketAddress localAddress;
+
+    private final IChannelOpenListener channelOpenListener;
+
+    private final Map<InetSocketAddress, MultiplexedConnection> connectionMap;
+
+    private final TCPEndpoint tcpEndpoint;
+
+    private final MuxDemuxPerformanceCounters perfCounters;
+
+    public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
+        this.localAddress = localAddress;
+        this.channelOpenListener = listener;
+        connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+        this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+            @Override
+            public void connectionEstablished(TCPConnection connection) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(connection.getRemoteAddress());
+                }
+                assert mConn != null;
+                mConn.setTCPConnection(connection);
+                connection.setEventListener(mConn);
+                connection.setAttachment(mConn);
+            }
+
+            @Override
+            public void acceptedConnection(TCPConnection connection) {
+                MultiplexedConnection mConn = new MultiplexedConnection(MuxDemux.this);
+                mConn.setTCPConnection(connection);
+                connection.setEventListener(mConn);
+                connection.setAttachment(mConn);
+            }
+
+            @Override
+            public void connectionFailure(InetSocketAddress remoteAddress) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(remoteAddress);
+                    assert mConn != null;
+                    int nConnectionAttempts = mConn.getConnectionAttempts();
+                    if (nConnectionAttempts > 5) {
+                        connectionMap.remove(remoteAddress);
+                        mConn.setConnectionFailure();
+                    } else {
+                        mConn.setConnectionAttempts(nConnectionAttempts + 1);
+                        tcpEndpoint.initiateConnection(remoteAddress);
+                    }
+                }
+            }
+        }, nThreads);
+        perfCounters = new MuxDemuxPerformanceCounters();
+    }
+
+    public void start() throws IOException {
+        tcpEndpoint.start(localAddress);
+    }
+
+    public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = null;
+        synchronized (this) {
+            mConn = connectionMap.get(remoteAddress);
+            if (mConn == null) {
+                mConn = new MultiplexedConnection(this);
+                connectionMap.put(remoteAddress, mConn);
+                tcpEndpoint.initiateConnection(remoteAddress);
+            }
+        }
+        mConn.waitUntilConnected();
+        return mConn;
+    }
+
+    IChannelOpenListener getChannelOpenListener() {
+        return channelOpenListener;
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return tcpEndpoint.getLocalAddress();
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return perfCounters;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
new file mode 100644
index 0000000..2e2636b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+class MuxDemuxCommand {
+    static final int MAX_CHANNEL_ID = 0x3ff;
+
+    static final int COMMAND_SIZE = 4;
+
+    static final int MAX_DATA_VALUE = 0x7ffff;
+
+    enum CommandType {
+        OPEN_CHANNEL,
+        CLOSE_CHANNEL,
+        CLOSE_CHANNEL_ACK,
+        ERROR,
+        ADD_CREDITS,
+        DATA,
+    }
+
+    private int channelId;
+
+    private CommandType type;
+
+    private int data;
+
+    public int getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(int channelId) throws NetException {
+        if (channelId > MAX_CHANNEL_ID) {
+            throw new NetException("channelId " + channelId + " exceeds " + MAX_CHANNEL_ID);
+        }
+        this.channelId = channelId;
+    }
+
+    public CommandType getCommandType() {
+        return type;
+    }
+
+    public void setCommandType(CommandType type) {
+        this.type = type;
+    }
+
+    public int getData() {
+        return data;
+    }
+
+    public void setData(int data) throws NetException {
+        if (data > MAX_DATA_VALUE) {
+            throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
+        }
+        this.data = data;
+    }
+
+    public void write(ByteBuffer buffer) {
+        int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
+        buffer.putInt(cmd);
+    }
+
+    public void read(ByteBuffer buffer) {
+        int cmd = buffer.getInt();
+        channelId = (cmd >> 22) & 0x3ff;
+        type = CommandType.values()[(cmd >> 19) & 0x7];
+        data = cmd & 0x7ffff;
+    }
+
+    @Override
+    public String toString() {
+        return channelId + ":" + type + ":" + data;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
new file mode 100644
index 0000000..6bf21aa
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MuxDemuxPerformanceCounters {
+    private final AtomicLong payloadBytesRead;
+
+    private final AtomicLong payloadBytesWritten;
+
+    private final AtomicLong signalingBytesRead;
+
+    private final AtomicLong signalingBytesWritten;
+
+    public MuxDemuxPerformanceCounters() {
+        payloadBytesRead = new AtomicLong();
+        payloadBytesWritten = new AtomicLong();
+        signalingBytesRead = new AtomicLong();
+        signalingBytesWritten = new AtomicLong();
+    }
+
+    public void addPayloadBytesRead(long delta) {
+        payloadBytesRead.addAndGet(delta);
+    }
+
+    public long getPayloadBytesRead() {
+        return payloadBytesRead.get();
+    }
+
+    public void addPayloadBytesWritten(long delta) {
+        payloadBytesWritten.addAndGet(delta);
+    }
+
+    public long getPayloadBytesWritten() {
+        return payloadBytesWritten.get();
+    }
+
+    public void addSignalingBytesRead(long delta) {
+        signalingBytesRead.addAndGet(delta);
+    }
+
+    public long getSignalingBytesRead() {
+        return signalingBytesRead.get();
+    }
+
+    public void addSignalingBytesWritten(long delta) {
+        signalingBytesWritten.addAndGet(delta);
+    }
+
+    public long getSignalingBytesWritten() {
+        return signalingBytesWritten.get();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
new file mode 100644
index 0000000..607bf31
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public interface ITCPConnectionEventListener {
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+            NetException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
new file mode 100644
index 0000000..ead2a1f
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.net.InetSocketAddress;
+
+public interface ITCPConnectionListener {
+    public void acceptedConnection(TCPConnection connection);
+
+    public void connectionEstablished(TCPConnection connection);
+
+    public void connectionFailure(InetSocketAddress remoteAddress);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
new file mode 100644
index 0000000..210508b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+public class TCPConnection {
+    private final TCPEndpoint endpoint;
+
+    private final SocketChannel channel;
+
+    private final SelectionKey key;
+
+    private final Selector selector;
+
+    private ITCPConnectionEventListener eventListener;
+
+    private Object attachment;
+
+    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+        this.endpoint = endpoint;
+        this.channel = channel;
+        this.key = key;
+        this.selector = selector;
+    }
+
+    public TCPEndpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public SocketChannel getSocketChannel() {
+        return channel;
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+        return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+    }
+
+    public void enable(int ops) {
+        key.interestOps(key.interestOps() | ops);
+        selector.wakeup();
+    }
+
+    public void disable(int ops) {
+        key.interestOps(key.interestOps() & ~(ops));
+        selector.wakeup();
+    }
+
+    public ITCPConnectionEventListener getEventListener() {
+        return eventListener;
+    }
+
+    public void setEventListener(ITCPConnectionEventListener eventListener) {
+        this.eventListener = eventListener;
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    public void close() {
+        key.cancel();
+        try {
+            channel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
new file mode 100644
index 0000000..1a73bdc
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TCPEndpoint {
+    private final ITCPConnectionListener connectionListener;
+
+    private final int nThreads;
+
+    private ServerSocketChannel serverSocketChannel;
+
+    private InetSocketAddress localAddress;
+
+    private IOThread[] ioThreads;
+
+    private int nextThread;
+
+    public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
+        this.connectionListener = connectionListener;
+        this.nThreads = nThreads;
+    }
+
+    public void start(InetSocketAddress localAddress) throws IOException {
+        serverSocketChannel = ServerSocketChannel.open();
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(localAddress);
+        this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+        ioThreads = new IOThread[nThreads];
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i] = new IOThread();
+        }
+        ioThreads[0].registerServerSocket(serverSocketChannel);
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i].start();
+        }
+    }
+
+    private synchronized int getNextThread() {
+        int result = nextThread;
+        nextThread = (nextThread + 1) % nThreads;
+        return result;
+    }
+
+    public void initiateConnection(InetSocketAddress remoteAddress) {
+        int targetThread = getNextThread();
+        ioThreads[targetThread].initiateConnection(remoteAddress);
+    }
+
+    private void distributeIncomingConnection(SocketChannel channel) {
+        int targetThread = getNextThread();
+        ioThreads[targetThread].addIncomingConnection(channel);
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    private class IOThread extends Thread {
+        private final List<InetSocketAddress> pendingConnections;
+
+        private final List<InetSocketAddress> workingPendingConnections;
+
+        private final List<SocketChannel> incomingConnections;
+
+        private final List<SocketChannel> workingIncomingConnections;
+
+        private Selector selector;
+
+        public IOThread() throws IOException {
+            super("TCPEndpoint IO Thread");
+            setPriority(MAX_PRIORITY);
+            this.pendingConnections = new ArrayList<InetSocketAddress>();
+            this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+            this.incomingConnections = new ArrayList<SocketChannel>();
+            this.workingIncomingConnections = new ArrayList<SocketChannel>();
+            selector = Selector.open();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    int n = selector.select();
+                    collectOutstandingWork();
+                    if (!workingPendingConnections.isEmpty()) {
+                        for (InetSocketAddress address : workingPendingConnections) {
+                            SocketChannel channel = SocketChannel.open();
+                            channel.configureBlocking(false);
+                            boolean connect = false;
+                            boolean failure = false;
+                            try {
+                                connect = channel.connect(address);
+                            } catch (IOException e) {
+                                failure = true;
+                                connectionListener.connectionFailure(address);
+                            }
+                            if (!failure) {
+                                if (!connect) {
+                                    SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+                                    key.attach(address);
+                                } else {
+                                    SelectionKey key = channel.register(selector, 0);
+                                    createConnection(key, channel);
+                                }
+                            }
+                        }
+                        workingPendingConnections.clear();
+                    }
+                    if (!workingIncomingConnections.isEmpty()) {
+                        for (SocketChannel channel : workingIncomingConnections) {
+                            channel.configureBlocking(false);
+                            SelectionKey sKey = channel.register(selector, 0);
+                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                            sKey.attach(connection);
+                            synchronized (connectionListener) {
+                                connectionListener.acceptedConnection(connection);
+                            }
+                        }
+                        workingIncomingConnections.clear();
+                    }
+                    if (n > 0) {
+                        Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+                        while (i.hasNext()) {
+                            SelectionKey key = i.next();
+                            i.remove();
+                            SelectableChannel sc = key.channel();
+                            boolean readable = key.isReadable();
+                            boolean writable = key.isWritable();
+
+                            if (readable || writable) {
+                                TCPConnection connection = (TCPConnection) key.attachment();
+                                connection.getEventListener().notifyIOReady(connection, readable, writable);
+                            }
+                            if (key.isAcceptable()) {
+                                assert sc == serverSocketChannel;
+                                SocketChannel channel = serverSocketChannel.accept();
+                                distributeIncomingConnection(channel);
+                            } else if (key.isConnectable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                boolean finishConnect = false;
+                                try {
+                                    finishConnect = channel.finishConnect();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                    key.cancel();
+                                    connectionListener.connectionFailure((InetSocketAddress) key.attachment());
+                                }
+                                if (finishConnect) {
+                                    createConnection(key, channel);
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        private void createConnection(SelectionKey key, SocketChannel channel) {
+            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+            key.attach(connection);
+            key.interestOps(0);
+            connectionListener.connectionEstablished(connection);
+        }
+
+        synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+            pendingConnections.add(remoteAddress);
+            selector.wakeup();
+        }
+
+        synchronized void addIncomingConnection(SocketChannel channel) {
+            incomingConnections.add(channel);
+            selector.wakeup();
+        }
+
+        void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
+            serverSocketChannel.configureBlocking(false);
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+        }
+
+        private synchronized void collectOutstandingWork() {
+            if (!pendingConnections.isEmpty()) {
+                workingPendingConnections.addAll(pendingConnections);
+                pendingConnections.clear();
+            }
+            if (!incomingConnections.isEmpty()) {
+                workingIncomingConnections.addAll(incomingConnections);
+                incomingConnections.clear();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
new file mode 100644
index 0000000..31bc2df
--- /dev/null
+++ b/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.tests;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetTest {
+    @Test
+    public void test() throws Exception {
+        AtomicBoolean failFlag = new AtomicBoolean();
+
+        MuxDemux md1 = createMuxDemux("md1", failFlag);
+        md1.start();
+        MuxDemux md2 = createMuxDemux("md2", failFlag);
+        md2.start();
+        InetSocketAddress md2Address = md2.getLocalAddress();
+
+        MultiplexedConnection md1md2 = md1.connect(md2Address);
+
+        Thread t1 = createThread(md1md2, 1);
+        Thread t2 = createThread(md1md2, -1);
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        Assert.assertFalse("Failure flag was set to true", failFlag.get());
+    }
+
+    private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
+        return new Thread() {
+            @Override
+            public void run() {
+                try {
+                    ChannelControlBlock md1md2c1 = md1md2.openChannel();
+
+                    final Semaphore sem = new Semaphore(1);
+                    sem.acquire();
+                    md1md2c1.getWriteInterface().setEmptyBufferAcceptor(new IBufferAcceptor() {
+                        @Override
+                        public void accept(ByteBuffer buffer) {
+                        }
+                    });
+
+                    md1md2c1.getReadInterface().setFullBufferAcceptor(new ICloseableBufferAcceptor() {
+                        @Override
+                        public void accept(ByteBuffer buffer) {
+                        }
+
+                        @Override
+                        public void error(int ecode) {
+                        }
+
+                        @Override
+                        public void close() {
+                            sem.release();
+                        }
+                    });
+
+                    ICloseableBufferAcceptor fba = md1md2c1.getWriteInterface().getFullBufferAcceptor();
+                    for (int i = 0; i < 10000; ++i) {
+                        ByteBuffer buffer = ByteBuffer.allocate(1024);
+                        for (int j = 0; j < 256; ++j) {
+                            buffer.putInt(factor * (i + j));
+                        }
+                        buffer.flip();
+                        fba.accept(buffer);
+                    }
+                    fba.close();
+                    sem.acquire();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+    }
+
+    private MuxDemux createMuxDemux(final String label, final AtomicBoolean failFlag) {
+        IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
+            @Override
+            public void channelOpened(final ChannelControlBlock channel) {
+                final ChannelIO cio = new ChannelIO(label, channel);
+                channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
+                channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
+
+                final IBufferAcceptor rieba = channel.getReadInterface().getEmptyBufferAcceptor();
+                for (int i = 0; i < 50; ++i) {
+                    rieba.accept(ByteBuffer.allocate(1024));
+                }
+                new Thread() {
+                    private int prevTotal = 0;
+
+                    @Override
+                    public void run() {
+                        while (true) {
+                            ByteBuffer fbuf = null;
+                            synchronized (channel) {
+                                while (!cio.eos && cio.ecode == 0 && cio.rifq.isEmpty()) {
+                                    try {
+                                        channel.wait();
+                                    } catch (InterruptedException e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                                if (!cio.rifq.isEmpty()) {
+                                    fbuf = cio.rifq.poll();
+                                } else if (cio.ecode != 0) {
+                                    throw new RuntimeException("Error: " + cio.ecode);
+                                } else if (cio.eos) {
+                                    channel.getWriteInterface().getFullBufferAcceptor().close();
+                                    return;
+                                }
+                            }
+                            int counter = 0;
+                            while (fbuf.remaining() > 0) {
+                                counter += fbuf.getInt();
+                            }
+                            if (prevTotal != 0) {
+                                if (Math.abs(counter - prevTotal) != 256) {
+                                    failFlag.set(true);
+                                }
+                            }
+                            prevTotal = counter;
+                            fbuf.compact();
+                            rieba.accept(fbuf);
+                        }
+                    }
+                }.start();
+            }
+        };
+        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1);
+    }
+
+    private class ChannelIO {
+        private ChannelControlBlock channel;
+
+        private Queue<ByteBuffer> rifq;
+
+        private Queue<ByteBuffer> wieq;
+
+        private boolean eos;
+
+        private int ecode;
+
+        private ICloseableBufferAcceptor rifba;
+
+        private IBufferAcceptor wieba;
+
+        public ChannelIO(final String label, ChannelControlBlock channel) {
+            this.channel = channel;
+            this.rifq = new LinkedList<ByteBuffer>();
+            this.wieq = new LinkedList<ByteBuffer>();
+
+            rifba = new ICloseableBufferAcceptor() {
+                @Override
+                public void accept(ByteBuffer buffer) {
+                    rifq.add(buffer);
+                    ChannelIO.this.channel.notifyAll();
+                }
+
+                @Override
+                public void error(int ecode) {
+                    ChannelIO.this.ecode = ecode;
+                    ChannelIO.this.channel.notifyAll();
+                }
+
+                @Override
+                public void close() {
+                    eos = true;
+                    ChannelIO.this.channel.notifyAll();
+                }
+            };
+
+            wieba = new IBufferAcceptor() {
+                @Override
+                public void accept(ByteBuffer buffer) {
+                    wieq.add(buffer);
+                    ChannelIO.this.channel.notifyAll();
+                }
+            };
+        }
+    }
+}
\ No newline at end of file