Merged hyracks_dev_next -r 1287 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-net/pom.xml b/hyracks/hyracks-net/pom.xml
new file mode 100644
index 0000000..12004f7
--- /dev/null
+++ b/hyracks/hyracks-net/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-net</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
new file mode 100644
index 0000000..2f27bf0
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+import java.nio.ByteBuffer;
+
+public interface IBufferAcceptor {
+ public void accept(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
new file mode 100644
index 0000000..c395ac9
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+ public void close();
+
+ public void error(int ecode);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
new file mode 100644
index 0000000..ecd0373
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.exceptions;
+
+public class NetException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NetException() {
+ }
+
+ public NetException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NetException(String message) {
+ super(message);
+ }
+
+ public NetException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
new file mode 100644
index 0000000..4b55d4b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public class ChannelControlBlock {
+ private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
+
+ private final ChannelSet cSet;
+
+ private final int channelId;
+
+ private final ReadInterface ri;
+
+ private final WriteInterface wi;
+
+ private final AtomicBoolean localClose;
+
+ private final AtomicBoolean localCloseAck;
+
+ private final AtomicBoolean remoteClose;
+
+ ChannelControlBlock(ChannelSet cSet, int channelId) {
+ this.cSet = cSet;
+ this.channelId = channelId;
+ this.ri = new ReadInterface();
+ this.wi = new WriteInterface();
+ localClose = new AtomicBoolean();
+ localCloseAck = new AtomicBoolean();
+ remoteClose = new AtomicBoolean();
+ }
+
+ int getChannelId() {
+ return channelId;
+ }
+
+ public IChannelReadInterface getReadInterface() {
+ return ri;
+ }
+
+ public IChannelWriteInterface getWriteInterface() {
+ return wi;
+ }
+
+ private final class ReadInterface implements IChannelReadInterface {
+ private final Deque<ByteBuffer> riEmptyStack;
+
+ private final IBufferAcceptor eba = new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ int delta = buffer.remaining();
+ synchronized (ChannelControlBlock.this) {
+ if (remoteClose.get()) {
+ return;
+ }
+ riEmptyStack.push(buffer);
+ }
+ cSet.addPendingCredits(channelId, delta);
+ }
+ };
+
+ private ICloseableBufferAcceptor fba;
+
+ private volatile int credits;
+
+ private ByteBuffer currentReadBuffer;
+
+ ReadInterface() {
+ riEmptyStack = new ArrayDeque<ByteBuffer>();
+ credits = 0;
+ }
+
+ @Override
+ public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
+ fba = fullBufferAcceptor;
+ }
+
+ @Override
+ public IBufferAcceptor getEmptyBufferAcceptor() {
+ return eba;
+ }
+
+ int read(SocketChannel sc, int size) throws IOException, NetException {
+ while (true) {
+ if (size <= 0) {
+ return size;
+ }
+ if (ri.currentReadBuffer == null) {
+ ri.currentReadBuffer = ri.riEmptyStack.poll();
+ assert ri.currentReadBuffer != null;
+ }
+ int rSize = Math.min(size, ri.currentReadBuffer.remaining());
+ if (rSize > 0) {
+ ri.currentReadBuffer.limit(ri.currentReadBuffer.position() + rSize);
+ int len;
+ try {
+ len = sc.read(ri.currentReadBuffer);
+ if (len < 0) {
+ throw new NetException("Socket Closed");
+ }
+ } finally {
+ ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
+ }
+ size -= len;
+ if (len < rSize) {
+ return size;
+ }
+ } else {
+ return size;
+ }
+ if (ri.currentReadBuffer.remaining() <= 0) {
+ flush();
+ }
+ }
+ }
+
+ void flush() {
+ if (currentReadBuffer != null) {
+ currentReadBuffer.flip();
+ fba.accept(ri.currentReadBuffer);
+ currentReadBuffer = null;
+ }
+ }
+ }
+
+ private final class WriteInterface implements IChannelWriteInterface {
+ private final Queue<ByteBuffer> wiFullQueue;
+
+ private boolean channelWritabilityState;
+
+ private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (ChannelControlBlock.this) {
+ wiFullQueue.add(buffer);
+ adjustChannelWritability();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (ChannelControlBlock.this) {
+ if (eos) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Received duplicate close() on channel: " + channelId);
+ }
+ return;
+ }
+ eos = true;
+ adjustChannelWritability();
+ }
+ }
+
+ @Override
+ public void error(int ecode) {
+ synchronized (ChannelControlBlock.this) {
+ WriteInterface.this.ecode = ecode;
+ adjustChannelWritability();
+ }
+ }
+ };
+
+ private IBufferAcceptor eba;
+
+ private int credits;
+
+ private boolean eos;
+
+ private boolean eosSent;
+
+ private int ecode;
+
+ private boolean ecodeSent;
+
+ private ByteBuffer currentWriteBuffer;
+
+ WriteInterface() {
+ wiFullQueue = new ArrayDeque<ByteBuffer>();
+ credits = 0;
+ eos = false;
+ eosSent = false;
+ ecode = -1;
+ ecodeSent = false;
+ }
+
+ @Override
+ public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
+ eba = emptyBufferAcceptor;
+ }
+
+ @Override
+ public ICloseableBufferAcceptor getFullBufferAcceptor() {
+ return fba;
+ }
+
+ void write(MultiplexedConnection.WriterState writerState) throws NetException {
+ if (currentWriteBuffer == null) {
+ currentWriteBuffer = wiFullQueue.poll();
+ }
+ if (currentWriteBuffer != null) {
+ int size = Math.min(currentWriteBuffer.remaining(), credits);
+ if (size > 0) {
+ credits -= size;
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
+ writerState.command.setData(size);
+ writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+ } else {
+ adjustChannelWritability();
+ }
+ } else if (ecode >= 0 && !ecodeSent) {
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
+ writerState.command.setData(ecode);
+ writerState.reset(null, 0, null);
+ ecodeSent = true;
+ localClose.set(true);
+ adjustChannelWritability();
+ } else if (wi.eos && !wi.eosSent) {
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ eosSent = true;
+ localClose.set(true);
+ adjustChannelWritability();
+ }
+ }
+
+ void writeComplete() {
+ if (currentWriteBuffer.remaining() <= 0) {
+ currentWriteBuffer.clear();
+ eba.accept(currentWriteBuffer);
+ currentWriteBuffer = null;
+ adjustChannelWritability();
+ }
+ }
+
+ private boolean computeWritability() {
+ boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+ if (writableDataPresent) {
+ return credits > 0;
+ }
+ if (eos && !eosSent) {
+ return true;
+ }
+ if (ecode >= 0 && !ecodeSent) {
+ return true;
+ }
+ return false;
+ }
+
+ void adjustChannelWritability() {
+ boolean writable = computeWritability();
+ if (writable) {
+ if (!channelWritabilityState) {
+ cSet.markPendingWrite(channelId);
+ }
+ } else {
+ if (channelWritabilityState) {
+ cSet.unmarkPendingWrite(channelId);
+ }
+ }
+ channelWritabilityState = writable;
+ }
+ }
+
+ synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
+ wi.write(writerState);
+ }
+
+ synchronized void writeComplete() {
+ wi.writeComplete();
+ }
+
+ synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
+ return ri.read(sc, size);
+ }
+
+ int getReadCredits() {
+ return ri.credits;
+ }
+
+ void setReadCredits(int credits) {
+ this.ri.credits = credits;
+ }
+
+ synchronized void addWriteCredits(int delta) {
+ wi.credits += delta;
+ wi.adjustChannelWritability();
+ }
+
+ synchronized void reportRemoteEOS() {
+ ri.flush();
+ ri.fba.close();
+ remoteClose.set(true);
+ }
+
+ boolean getRemoteEOS() {
+ return remoteClose.get();
+ }
+
+ synchronized void reportLocalEOSAck() {
+ localCloseAck.set(true);
+ }
+
+ synchronized void reportRemoteError(int ecode) {
+ ri.flush();
+ ri.fba.error(ecode);
+ remoteClose.set(true);
+ }
+
+ boolean completelyClosed() {
+ return localCloseAck.get() && remoteClose.get();
+ }
+
+ @Override
+ public String toString() {
+ return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
+ + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
new file mode 100644
index 0000000..ffdb5c6
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public class ChannelSet {
+ private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
+
+ private static final int MAX_OPEN_CHANNELS = 1024;
+
+ private static final int INITIAL_SIZE = 16;
+
+ private final MultiplexedConnection mConn;
+
+ private ChannelControlBlock[] ccbArray;
+
+ private final BitSet allocationBitmap;
+
+ private final BitSet pendingChannelWriteBitmap;
+
+ private final BitSet pendingChannelCreditsBitmap;
+
+ private final BitSet pendingChannelSynBitmap;
+
+ private final BitSet pendingEOSAckBitmap;
+
+ private int openChannelCount;
+
+ private final IEventCounter pendingWriteEventsCounter;
+
+ ChannelSet(MultiplexedConnection mConn, IEventCounter pendingWriteEventsCounter) {
+ this.mConn = mConn;
+ ccbArray = new ChannelControlBlock[INITIAL_SIZE];
+ allocationBitmap = new BitSet();
+ pendingChannelWriteBitmap = new BitSet();
+ pendingChannelCreditsBitmap = new BitSet();
+ pendingChannelSynBitmap = new BitSet();
+ pendingEOSAckBitmap = new BitSet();
+ this.pendingWriteEventsCounter = pendingWriteEventsCounter;
+ openChannelCount = 0;
+ }
+
+ ChannelControlBlock allocateChannel() throws NetException {
+ synchronized (mConn) {
+ int idx = allocationBitmap.nextClearBit(0);
+ if (idx < 0 || idx == ccbArray.length) {
+ cleanupClosedChannels();
+ idx = allocationBitmap.nextClearBit(0);
+ if (idx < 0 || idx == ccbArray.length) {
+ idx = ccbArray.length;
+ }
+ }
+ return createChannel(idx);
+ }
+ }
+
+ private void cleanupClosedChannels() {
+ for (int i = 0; i < ccbArray.length; ++i) {
+ ChannelControlBlock ccb = ccbArray[i];
+ if (ccb != null) {
+ if (ccb.completelyClosed()) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Cleaning free channel: " + ccb);
+ }
+ freeChannel(ccb);
+ }
+ }
+ }
+ }
+
+ ChannelControlBlock registerChannel(int channelId) throws NetException {
+ synchronized (mConn) {
+ return createChannel(channelId);
+ }
+ }
+
+ private void freeChannel(ChannelControlBlock channel) {
+ int idx = channel.getChannelId();
+ ccbArray[idx] = null;
+ allocationBitmap.clear(idx);
+ pendingChannelWriteBitmap.clear(idx);
+ pendingChannelCreditsBitmap.clear(idx);
+ pendingChannelSynBitmap.clear(idx);
+ pendingEOSAckBitmap.clear(idx);
+ --openChannelCount;
+ }
+
+ ChannelControlBlock getCCB(int channelId) {
+ return ccbArray[channelId];
+ }
+
+ BitSet getPendingChannelWriteBitmap() {
+ return pendingChannelWriteBitmap;
+ }
+
+ BitSet getPendingChannelCreditsBitmap() {
+ return pendingChannelCreditsBitmap;
+ }
+
+ BitSet getPendingChannelSynBitmap() {
+ return pendingChannelSynBitmap;
+ }
+
+ BitSet getPendingEOSAckBitmap() {
+ return pendingEOSAckBitmap;
+ }
+
+ int getOpenChannelCount() {
+ return openChannelCount;
+ }
+
+ void initiateChannelSyn(int channelId) {
+ synchronized (mConn) {
+ assert !pendingChannelSynBitmap.get(channelId);
+ pendingChannelSynBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
+ void addPendingCredits(int channelId, int delta) {
+ if (delta <= 0) {
+ return;
+ }
+ synchronized (mConn) {
+ ChannelControlBlock ccb = ccbArray[channelId];
+ if (ccb != null) {
+ if (ccb.getRemoteEOS()) {
+ return;
+ }
+ int oldCredits = ccb.getReadCredits();
+ ccb.setReadCredits(oldCredits + delta);
+ if (oldCredits == 0) {
+ assert !pendingChannelCreditsBitmap.get(channelId);
+ pendingChannelCreditsBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+ }
+ }
+
+ void unmarkPendingCredits(int channelId) {
+ synchronized (mConn) {
+ if (pendingChannelCreditsBitmap.get(channelId)) {
+ pendingChannelCreditsBitmap.clear(channelId);
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+ }
+
+ void markPendingWrite(int channelId) {
+ synchronized (mConn) {
+ assert !pendingChannelWriteBitmap.get(channelId);
+ pendingChannelWriteBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
+ void unmarkPendingWrite(int channelId) {
+ synchronized (mConn) {
+ assert pendingChannelWriteBitmap.get(channelId);
+ pendingChannelWriteBitmap.clear(channelId);
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+
+ void markEOSAck(int channelId) {
+ synchronized (mConn) {
+ assert !pendingEOSAckBitmap.get(channelId);
+ pendingEOSAckBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
+ private ChannelControlBlock createChannel(int idx) throws NetException {
+ if (idx >= ccbArray.length) {
+ expand(idx);
+ }
+ if (idx > MAX_OPEN_CHANNELS) {
+ throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
+ }
+ if (ccbArray[idx] != null) {
+ assert ccbArray[idx].completelyClosed();
+ if (ccbArray[idx].completelyClosed()) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
+ }
+ freeChannel(ccbArray[idx]);
+ }
+ }
+ assert idx < ccbArray.length;
+ assert !allocationBitmap.get(idx);
+ ChannelControlBlock channel = new ChannelControlBlock(this, idx);
+ ccbArray[idx] = channel;
+ allocationBitmap.set(idx);
+ ++openChannelCount;
+ return channel;
+ }
+
+ private void expand(int idx) {
+ while (idx >= ccbArray.length) {
+ ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
new file mode 100644
index 0000000..0fc9b2a
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IChannelOpenListener {
+ public void channelOpened(ChannelControlBlock channel);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
new file mode 100644
index 0000000..468a617
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelReadInterface {
+ public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
+
+ public IBufferAcceptor getEmptyBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
new file mode 100644
index 0000000..1e53d71
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelWriteInterface {
+ public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+
+ public ICloseableBufferAcceptor getFullBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
new file mode 100644
index 0000000..148078c
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IEventCounter {
+ public void increment();
+
+ public void decrement();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
new file mode 100644
index 0000000..c905f57
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+
+public class MultiplexedConnection implements ITCPConnectionEventListener {
+ private static final Logger LOGGER = Logger.getLogger(MultiplexedConnection.class.getName());
+
+ private static final int MAX_CHUNKS_READ_PER_CYCLE = 4;
+
+ private final MuxDemux muxDemux;
+
+ private final IEventCounter pendingWriteEventsCounter;
+
+ private final ChannelSet cSet;
+
+ private final ReaderState readerState;
+
+ private final WriterState writerState;
+
+ private TCPConnection tcpConnection;
+
+ private int lastChannelWritten;
+
+ private int nConnectionAttempts;
+
+ private boolean connectionFailure;
+
+ public MultiplexedConnection(MuxDemux muxDemux) {
+ this.muxDemux = muxDemux;
+ pendingWriteEventsCounter = new IEventCounter() {
+ private int counter;
+
+ @Override
+ public synchronized void increment() {
+ ++counter;
+ if (counter == 1) {
+ tcpConnection.enable(SelectionKey.OP_WRITE);
+ }
+ }
+
+ @Override
+ public synchronized void decrement() {
+ --counter;
+ if (counter == 0) {
+ tcpConnection.disable(SelectionKey.OP_WRITE);
+ }
+ if (counter < 0) {
+ throw new IllegalStateException();
+ }
+ }
+ };
+ cSet = new ChannelSet(this, pendingWriteEventsCounter);
+ readerState = new ReaderState();
+ writerState = new WriterState();
+ lastChannelWritten = -1;
+ connectionFailure = false;
+ }
+
+ int getConnectionAttempts() {
+ return nConnectionAttempts;
+ }
+
+ void setConnectionAttempts(int nConnectionAttempts) {
+ this.nConnectionAttempts = nConnectionAttempts;
+ }
+
+ synchronized void setTCPConnection(TCPConnection tcpConnection) {
+ this.tcpConnection = tcpConnection;
+ tcpConnection.enable(SelectionKey.OP_READ);
+ notifyAll();
+ }
+
+ synchronized void setConnectionFailure() {
+ this.connectionFailure = true;
+ notifyAll();
+ }
+
+ synchronized void waitUntilConnected() throws InterruptedException, NetException {
+ while (tcpConnection == null && !connectionFailure) {
+ wait();
+ }
+ if (connectionFailure) {
+ throw new NetException("Connection failure");
+ }
+ }
+
+ @Override
+ public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+ NetException {
+ if (readable) {
+ driveReaderStateMachine();
+ }
+ if (writable) {
+ driveWriterStateMachine();
+ }
+ }
+
+ public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+ ChannelControlBlock channel = cSet.allocateChannel();
+ int channelId = channel.getChannelId();
+ cSet.initiateChannelSyn(channelId);
+ return channel;
+ }
+
+ class WriterState {
+ private final ByteBuffer writeBuffer;
+
+ final MuxDemuxCommand command;
+
+ private ByteBuffer pendingBuffer;
+
+ private int pendingWriteSize;
+
+ private ChannelControlBlock ccb;
+
+ public WriterState() {
+ writeBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
+ writeBuffer.flip();
+ command = new MuxDemuxCommand();
+ ccb = null;
+ }
+
+ boolean writePending() {
+ return writeBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+ }
+
+ void reset(ByteBuffer pendingBuffer, int pendingWriteSize, ChannelControlBlock ccb) {
+ writeBuffer.clear();
+ command.write(writeBuffer);
+ writeBuffer.flip();
+ this.pendingBuffer = pendingBuffer;
+ this.pendingWriteSize = pendingWriteSize;
+ this.ccb = ccb;
+ }
+
+ boolean performPendingWrite(SocketChannel sc) throws IOException {
+ int len = writeBuffer.remaining();
+ if (len > 0) {
+ int written = sc.write(writeBuffer);
+ muxDemux.getPerformanceCounters().addSignalingBytesWritten(written);
+ if (written < len) {
+ return false;
+ }
+ }
+ if (pendingBuffer != null) {
+ if (pendingWriteSize > 0) {
+ assert pendingWriteSize <= pendingBuffer.remaining();
+ int oldLimit = pendingBuffer.limit();
+ try {
+ pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
+ int written = sc.write(pendingBuffer);
+ muxDemux.getPerformanceCounters().addPayloadBytesWritten(written);
+ pendingWriteSize -= written;
+ } finally {
+ pendingBuffer.limit(oldLimit);
+ }
+ }
+ if (pendingWriteSize > 0) {
+ return false;
+ }
+ pendingBuffer = null;
+ pendingWriteSize = 0;
+ }
+ if (ccb != null) {
+ ccb.writeComplete();
+ ccb = null;
+ }
+ return true;
+ }
+ }
+
+ void driveWriterStateMachine() throws IOException, NetException {
+ SocketChannel sc = tcpConnection.getSocketChannel();
+ if (writerState.writePending()) {
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ pendingWriteEventsCounter.decrement();
+ }
+ int numCycles;
+
+ synchronized (MultiplexedConnection.this) {
+ numCycles = cSet.getOpenChannelCount();
+ }
+
+ for (int i = 0; i < numCycles; ++i) {
+ ChannelControlBlock writeCCB = null;
+ synchronized (MultiplexedConnection.this) {
+ BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
+ for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
+ pendingChannelSynBitmap.clear(j);
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ pendingWriteEventsCounter.decrement();
+ }
+ BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
+ for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
+ .nextSetBit(j)) {
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
+ ChannelControlBlock ccb = cSet.getCCB(j);
+ int credits = ccb.getReadCredits();
+ int effectiveCredits;
+ if (credits <= MuxDemuxCommand.MAX_DATA_VALUE) {
+ effectiveCredits = credits;
+ ccb.setReadCredits(0);
+ pendingChannelCreditsBitmap.clear(j);
+ } else {
+ effectiveCredits = MuxDemuxCommand.MAX_DATA_VALUE;
+ ccb.setReadCredits(credits - effectiveCredits);
+ }
+ writerState.command.setData(effectiveCredits);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ if (credits == effectiveCredits) {
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+ BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
+ for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
+ pendingEOSAckBitmap.clear(j);
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ pendingWriteEventsCounter.decrement();
+ }
+ BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
+ lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
+ if (lastChannelWritten < 0) {
+ lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(0);
+ if (lastChannelWritten < 0) {
+ return;
+ }
+ }
+ writeCCB = cSet.getCCB(lastChannelWritten);
+ }
+ writeCCB.write(writerState);
+ if (writerState.writePending()) {
+ pendingWriteEventsCounter.increment();
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+ }
+
+ class ReaderState {
+ private final ByteBuffer readBuffer;
+
+ final MuxDemuxCommand command;
+
+ private int pendingReadSize;
+
+ private ChannelControlBlock ccb;
+
+ ReaderState() {
+ readBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
+ command = new MuxDemuxCommand();
+ }
+
+ void reset() {
+ readBuffer.clear();
+ pendingReadSize = 0;
+ ccb = null;
+ }
+
+ private ChannelControlBlock getCCBInCommand() {
+ synchronized (MultiplexedConnection.this) {
+ return cSet.getCCB(command.getChannelId());
+ }
+ }
+ }
+
+ void driveReaderStateMachine() throws IOException, NetException {
+ SocketChannel sc = tcpConnection.getSocketChannel();
+ int chunksRead = 0;
+ while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+ if (readerState.readBuffer.remaining() > 0) {
+ int read = sc.read(readerState.readBuffer);
+ if (read < 0) {
+ throw new NetException("Socket Closed");
+ }
+ muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+ if (readerState.readBuffer.remaining() > 0) {
+ return;
+ }
+ readerState.readBuffer.flip();
+ readerState.command.read(readerState.readBuffer);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received command: " + readerState.command);
+ }
+ ChannelControlBlock ccb = null;
+ switch (readerState.command.getCommandType()) {
+ case ADD_CREDITS: {
+ ccb = readerState.getCCBInCommand();
+ ccb.addWriteCredits(readerState.command.getData());
+ break;
+ }
+ case CLOSE_CHANNEL: {
+ ccb = readerState.getCCBInCommand();
+ ccb.reportRemoteEOS();
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
+ break;
+ }
+ case CLOSE_CHANNEL_ACK: {
+ ccb = readerState.getCCBInCommand();
+ ccb.reportLocalEOSAck();
+ break;
+ }
+ case DATA: {
+ ccb = readerState.getCCBInCommand();
+ readerState.pendingReadSize = readerState.command.getData();
+ readerState.ccb = ccb;
+ break;
+ }
+ case ERROR: {
+ ccb = readerState.getCCBInCommand();
+ ccb.reportRemoteError(readerState.command.getData());
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
+ break;
+ }
+ case OPEN_CHANNEL: {
+ int channelId = readerState.command.getChannelId();
+ ccb = cSet.registerChannel(channelId);
+ muxDemux.getChannelOpenListener().channelOpened(ccb);
+ }
+ }
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
+ }
+ }
+ if (readerState.pendingReadSize > 0) {
+ ++chunksRead;
+ int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+ muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
+ readerState.pendingReadSize = newPendingReadSize;
+ if (readerState.pendingReadSize > 0) {
+ return;
+ }
+ }
+ readerState.reset();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
new file mode 100644
index 0000000..8548bb8
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
+
+public class MuxDemux {
+ private final InetSocketAddress localAddress;
+
+ private final IChannelOpenListener channelOpenListener;
+
+ private final Map<InetSocketAddress, MultiplexedConnection> connectionMap;
+
+ private final TCPEndpoint tcpEndpoint;
+
+ private final MuxDemuxPerformanceCounters perfCounters;
+
+ public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
+ this.localAddress = localAddress;
+ this.channelOpenListener = listener;
+ connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+ this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+ @Override
+ public void connectionEstablished(TCPConnection connection) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(connection.getRemoteAddress());
+ }
+ assert mConn != null;
+ mConn.setTCPConnection(connection);
+ connection.setEventListener(mConn);
+ connection.setAttachment(mConn);
+ }
+
+ @Override
+ public void acceptedConnection(TCPConnection connection) {
+ MultiplexedConnection mConn = new MultiplexedConnection(MuxDemux.this);
+ mConn.setTCPConnection(connection);
+ connection.setEventListener(mConn);
+ connection.setAttachment(mConn);
+ }
+
+ @Override
+ public void connectionFailure(InetSocketAddress remoteAddress) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(remoteAddress);
+ assert mConn != null;
+ int nConnectionAttempts = mConn.getConnectionAttempts();
+ if (nConnectionAttempts > 5) {
+ connectionMap.remove(remoteAddress);
+ mConn.setConnectionFailure();
+ } else {
+ mConn.setConnectionAttempts(nConnectionAttempts + 1);
+ tcpEndpoint.initiateConnection(remoteAddress);
+ }
+ }
+ }
+ }, nThreads);
+ perfCounters = new MuxDemuxPerformanceCounters();
+ }
+
+ public void start() throws IOException {
+ tcpEndpoint.start(localAddress);
+ }
+
+ public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = null;
+ synchronized (this) {
+ mConn = connectionMap.get(remoteAddress);
+ if (mConn == null) {
+ mConn = new MultiplexedConnection(this);
+ connectionMap.put(remoteAddress, mConn);
+ tcpEndpoint.initiateConnection(remoteAddress);
+ }
+ }
+ mConn.waitUntilConnected();
+ return mConn;
+ }
+
+ IChannelOpenListener getChannelOpenListener() {
+ return channelOpenListener;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return tcpEndpoint.getLocalAddress();
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return perfCounters;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
new file mode 100644
index 0000000..2e2636b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+class MuxDemuxCommand {
+ static final int MAX_CHANNEL_ID = 0x3ff;
+
+ static final int COMMAND_SIZE = 4;
+
+ static final int MAX_DATA_VALUE = 0x7ffff;
+
+ enum CommandType {
+ OPEN_CHANNEL,
+ CLOSE_CHANNEL,
+ CLOSE_CHANNEL_ACK,
+ ERROR,
+ ADD_CREDITS,
+ DATA,
+ }
+
+ private int channelId;
+
+ private CommandType type;
+
+ private int data;
+
+ public int getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(int channelId) throws NetException {
+ if (channelId > MAX_CHANNEL_ID) {
+ throw new NetException("channelId " + channelId + " exceeds " + MAX_CHANNEL_ID);
+ }
+ this.channelId = channelId;
+ }
+
+ public CommandType getCommandType() {
+ return type;
+ }
+
+ public void setCommandType(CommandType type) {
+ this.type = type;
+ }
+
+ public int getData() {
+ return data;
+ }
+
+ public void setData(int data) throws NetException {
+ if (data > MAX_DATA_VALUE) {
+ throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
+ }
+ this.data = data;
+ }
+
+ public void write(ByteBuffer buffer) {
+ int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
+ buffer.putInt(cmd);
+ }
+
+ public void read(ByteBuffer buffer) {
+ int cmd = buffer.getInt();
+ channelId = (cmd >> 22) & 0x3ff;
+ type = CommandType.values()[(cmd >> 19) & 0x7];
+ data = cmd & 0x7ffff;
+ }
+
+ @Override
+ public String toString() {
+ return channelId + ":" + type + ":" + data;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
new file mode 100644
index 0000000..6bf21aa
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MuxDemuxPerformanceCounters {
+ private final AtomicLong payloadBytesRead;
+
+ private final AtomicLong payloadBytesWritten;
+
+ private final AtomicLong signalingBytesRead;
+
+ private final AtomicLong signalingBytesWritten;
+
+ public MuxDemuxPerformanceCounters() {
+ payloadBytesRead = new AtomicLong();
+ payloadBytesWritten = new AtomicLong();
+ signalingBytesRead = new AtomicLong();
+ signalingBytesWritten = new AtomicLong();
+ }
+
+ public void addPayloadBytesRead(long delta) {
+ payloadBytesRead.addAndGet(delta);
+ }
+
+ public long getPayloadBytesRead() {
+ return payloadBytesRead.get();
+ }
+
+ public void addPayloadBytesWritten(long delta) {
+ payloadBytesWritten.addAndGet(delta);
+ }
+
+ public long getPayloadBytesWritten() {
+ return payloadBytesWritten.get();
+ }
+
+ public void addSignalingBytesRead(long delta) {
+ signalingBytesRead.addAndGet(delta);
+ }
+
+ public long getSignalingBytesRead() {
+ return signalingBytesRead.get();
+ }
+
+ public void addSignalingBytesWritten(long delta) {
+ signalingBytesWritten.addAndGet(delta);
+ }
+
+ public long getSignalingBytesWritten() {
+ return signalingBytesWritten.get();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
new file mode 100644
index 0000000..607bf31
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
+public interface ITCPConnectionEventListener {
+ public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+ NetException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
new file mode 100644
index 0000000..ead2a1f
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.net.InetSocketAddress;
+
+public interface ITCPConnectionListener {
+ public void acceptedConnection(TCPConnection connection);
+
+ public void connectionEstablished(TCPConnection connection);
+
+ public void connectionFailure(InetSocketAddress remoteAddress);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
new file mode 100644
index 0000000..210508b
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+public class TCPConnection {
+ private final TCPEndpoint endpoint;
+
+ private final SocketChannel channel;
+
+ private final SelectionKey key;
+
+ private final Selector selector;
+
+ private ITCPConnectionEventListener eventListener;
+
+ private Object attachment;
+
+ public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+ this.endpoint = endpoint;
+ this.channel = channel;
+ this.key = key;
+ this.selector = selector;
+ }
+
+ public TCPEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public SocketChannel getSocketChannel() {
+ return channel;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+ }
+
+ public void enable(int ops) {
+ key.interestOps(key.interestOps() | ops);
+ selector.wakeup();
+ }
+
+ public void disable(int ops) {
+ key.interestOps(key.interestOps() & ~(ops));
+ selector.wakeup();
+ }
+
+ public ITCPConnectionEventListener getEventListener() {
+ return eventListener;
+ }
+
+ public void setEventListener(ITCPConnectionEventListener eventListener) {
+ this.eventListener = eventListener;
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ public void close() {
+ key.cancel();
+ try {
+ channel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
new file mode 100644
index 0000000..1a73bdc
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TCPEndpoint {
+ private final ITCPConnectionListener connectionListener;
+
+ private final int nThreads;
+
+ private ServerSocketChannel serverSocketChannel;
+
+ private InetSocketAddress localAddress;
+
+ private IOThread[] ioThreads;
+
+ private int nextThread;
+
+ public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
+ this.connectionListener = connectionListener;
+ this.nThreads = nThreads;
+ }
+
+ public void start(InetSocketAddress localAddress) throws IOException {
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(localAddress);
+ this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+ ioThreads = new IOThread[nThreads];
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i] = new IOThread();
+ }
+ ioThreads[0].registerServerSocket(serverSocketChannel);
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i].start();
+ }
+ }
+
+ private synchronized int getNextThread() {
+ int result = nextThread;
+ nextThread = (nextThread + 1) % nThreads;
+ return result;
+ }
+
+ public void initiateConnection(InetSocketAddress remoteAddress) {
+ int targetThread = getNextThread();
+ ioThreads[targetThread].initiateConnection(remoteAddress);
+ }
+
+ private void distributeIncomingConnection(SocketChannel channel) {
+ int targetThread = getNextThread();
+ ioThreads[targetThread].addIncomingConnection(channel);
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return localAddress;
+ }
+
+ private class IOThread extends Thread {
+ private final List<InetSocketAddress> pendingConnections;
+
+ private final List<InetSocketAddress> workingPendingConnections;
+
+ private final List<SocketChannel> incomingConnections;
+
+ private final List<SocketChannel> workingIncomingConnections;
+
+ private Selector selector;
+
+ public IOThread() throws IOException {
+ super("TCPEndpoint IO Thread");
+ setPriority(MAX_PRIORITY);
+ this.pendingConnections = new ArrayList<InetSocketAddress>();
+ this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+ this.incomingConnections = new ArrayList<SocketChannel>();
+ this.workingIncomingConnections = new ArrayList<SocketChannel>();
+ selector = Selector.open();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ int n = selector.select();
+ collectOutstandingWork();
+ if (!workingPendingConnections.isEmpty()) {
+ for (InetSocketAddress address : workingPendingConnections) {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ boolean connect = false;
+ boolean failure = false;
+ try {
+ connect = channel.connect(address);
+ } catch (IOException e) {
+ failure = true;
+ connectionListener.connectionFailure(address);
+ }
+ if (!failure) {
+ if (!connect) {
+ SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+ key.attach(address);
+ } else {
+ SelectionKey key = channel.register(selector, 0);
+ createConnection(key, channel);
+ }
+ }
+ }
+ workingPendingConnections.clear();
+ }
+ if (!workingIncomingConnections.isEmpty()) {
+ for (SocketChannel channel : workingIncomingConnections) {
+ channel.configureBlocking(false);
+ SelectionKey sKey = channel.register(selector, 0);
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+ sKey.attach(connection);
+ synchronized (connectionListener) {
+ connectionListener.acceptedConnection(connection);
+ }
+ }
+ workingIncomingConnections.clear();
+ }
+ if (n > 0) {
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+ while (i.hasNext()) {
+ SelectionKey key = i.next();
+ i.remove();
+ SelectableChannel sc = key.channel();
+ boolean readable = key.isReadable();
+ boolean writable = key.isWritable();
+
+ if (readable || writable) {
+ TCPConnection connection = (TCPConnection) key.attachment();
+ connection.getEventListener().notifyIOReady(connection, readable, writable);
+ }
+ if (key.isAcceptable()) {
+ assert sc == serverSocketChannel;
+ SocketChannel channel = serverSocketChannel.accept();
+ distributeIncomingConnection(channel);
+ } else if (key.isConnectable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ boolean finishConnect = false;
+ try {
+ finishConnect = channel.finishConnect();
+ } catch (Exception e) {
+ e.printStackTrace();
+ key.cancel();
+ connectionListener.connectionFailure((InetSocketAddress) key.attachment());
+ }
+ if (finishConnect) {
+ createConnection(key, channel);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void createConnection(SelectionKey key, SocketChannel channel) {
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+ key.attach(connection);
+ key.interestOps(0);
+ connectionListener.connectionEstablished(connection);
+ }
+
+ synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+ pendingConnections.add(remoteAddress);
+ selector.wakeup();
+ }
+
+ synchronized void addIncomingConnection(SocketChannel channel) {
+ incomingConnections.add(channel);
+ selector.wakeup();
+ }
+
+ void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ }
+
+ private synchronized void collectOutstandingWork() {
+ if (!pendingConnections.isEmpty()) {
+ workingPendingConnections.addAll(pendingConnections);
+ pendingConnections.clear();
+ }
+ if (!incomingConnections.isEmpty()) {
+ workingIncomingConnections.addAll(incomingConnections);
+ incomingConnections.clear();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
new file mode 100644
index 0000000..31bc2df
--- /dev/null
+++ b/hyracks/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.tests;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetTest {
+ @Test
+ public void test() throws Exception {
+ AtomicBoolean failFlag = new AtomicBoolean();
+
+ MuxDemux md1 = createMuxDemux("md1", failFlag);
+ md1.start();
+ MuxDemux md2 = createMuxDemux("md2", failFlag);
+ md2.start();
+ InetSocketAddress md2Address = md2.getLocalAddress();
+
+ MultiplexedConnection md1md2 = md1.connect(md2Address);
+
+ Thread t1 = createThread(md1md2, 1);
+ Thread t2 = createThread(md1md2, -1);
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ Assert.assertFalse("Failure flag was set to true", failFlag.get());
+ }
+
+ private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
+ return new Thread() {
+ @Override
+ public void run() {
+ try {
+ ChannelControlBlock md1md2c1 = md1md2.openChannel();
+
+ final Semaphore sem = new Semaphore(1);
+ sem.acquire();
+ md1md2c1.getWriteInterface().setEmptyBufferAcceptor(new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ }
+ });
+
+ md1md2c1.getReadInterface().setFullBufferAcceptor(new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ }
+
+ @Override
+ public void error(int ecode) {
+ }
+
+ @Override
+ public void close() {
+ sem.release();
+ }
+ });
+
+ ICloseableBufferAcceptor fba = md1md2c1.getWriteInterface().getFullBufferAcceptor();
+ for (int i = 0; i < 10000; ++i) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ for (int j = 0; j < 256; ++j) {
+ buffer.putInt(factor * (i + j));
+ }
+ buffer.flip();
+ fba.accept(buffer);
+ }
+ fba.close();
+ sem.acquire();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ }
+
+ private MuxDemux createMuxDemux(final String label, final AtomicBoolean failFlag) {
+ IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
+ @Override
+ public void channelOpened(final ChannelControlBlock channel) {
+ final ChannelIO cio = new ChannelIO(label, channel);
+ channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
+ channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
+
+ final IBufferAcceptor rieba = channel.getReadInterface().getEmptyBufferAcceptor();
+ for (int i = 0; i < 50; ++i) {
+ rieba.accept(ByteBuffer.allocate(1024));
+ }
+ new Thread() {
+ private int prevTotal = 0;
+
+ @Override
+ public void run() {
+ while (true) {
+ ByteBuffer fbuf = null;
+ synchronized (channel) {
+ while (!cio.eos && cio.ecode == 0 && cio.rifq.isEmpty()) {
+ try {
+ channel.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (!cio.rifq.isEmpty()) {
+ fbuf = cio.rifq.poll();
+ } else if (cio.ecode != 0) {
+ throw new RuntimeException("Error: " + cio.ecode);
+ } else if (cio.eos) {
+ channel.getWriteInterface().getFullBufferAcceptor().close();
+ return;
+ }
+ }
+ int counter = 0;
+ while (fbuf.remaining() > 0) {
+ counter += fbuf.getInt();
+ }
+ if (prevTotal != 0) {
+ if (Math.abs(counter - prevTotal) != 256) {
+ failFlag.set(true);
+ }
+ }
+ prevTotal = counter;
+ fbuf.compact();
+ rieba.accept(fbuf);
+ }
+ }
+ }.start();
+ }
+ };
+ return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1);
+ }
+
+ private class ChannelIO {
+ private ChannelControlBlock channel;
+
+ private Queue<ByteBuffer> rifq;
+
+ private Queue<ByteBuffer> wieq;
+
+ private boolean eos;
+
+ private int ecode;
+
+ private ICloseableBufferAcceptor rifba;
+
+ private IBufferAcceptor wieba;
+
+ public ChannelIO(final String label, ChannelControlBlock channel) {
+ this.channel = channel;
+ this.rifq = new LinkedList<ByteBuffer>();
+ this.wieq = new LinkedList<ByteBuffer>();
+
+ rifba = new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ rifq.add(buffer);
+ ChannelIO.this.channel.notifyAll();
+ }
+
+ @Override
+ public void error(int ecode) {
+ ChannelIO.this.ecode = ecode;
+ ChannelIO.this.channel.notifyAll();
+ }
+
+ @Override
+ public void close() {
+ eos = true;
+ ChannelIO.this.channel.notifyAll();
+ }
+ };
+
+ wieba = new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ wieq.add(buffer);
+ ChannelIO.this.channel.notifyAll();
+ }
+ };
+ }
+ }
+}
\ No newline at end of file