Added new networking layer.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@984 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index 64a409c..8c56958 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -1,9 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
@@ -37,6 +34,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d869515..d5fc27e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -65,7 +65,7 @@
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
@@ -91,7 +91,7 @@
private final PartitionManager partitionManager;
- private final ConnectionManager connectionManager;
+ private final NetworkManager netManager;
private final WorkQueue queue;
@@ -131,9 +131,8 @@
if (id == null) {
throw new Exception("id not set");
}
- connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
partitionManager = new PartitionManager(this);
- connectionManager.setPartitionRequestListener(partitionManager);
+ netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -166,7 +165,7 @@
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
- connectionManager.start();
+ netManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -174,9 +173,9 @@
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
- connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
- osMXBean.getAvailableProcessors(), hbSchema));
+ this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
+ .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
+ .getAvailableProcessors(), hbSchema));
queue.start();
heartbeatTask = new HeartbeatTask(ccs);
@@ -197,7 +196,7 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
partitionManager.close();
heartbeatTask.cancel();
- connectionManager.stop();
+ netManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -218,8 +217,8 @@
return jobletMap;
}
- public ConnectionManager getConnectionManager() {
- return connectionManager;
+ public NetworkManager getNetworkManager() {
+ return netManager;
}
public PartitionManager getPartitionManager() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
deleted file mode 100644
index 6e38ef7..0000000
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
-
-public class ConnectionManager {
- private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
-
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IHyracksRootContext ctx;
-
- private IPartitionRequestListener partitionRequestListener;
-
- private final ServerSocketChannel serverChannel;
-
- private volatile boolean stopped;
-
- private final ConnectionListenerThread connectionListener;
-
- private final DataListenerThread dataListener;
-
- private final NetworkAddress networkAddress;
-
- public ConnectionManager(IHyracksRootContext ctx, InetAddress inetAddress) throws IOException {
- this.ctx = ctx;
- serverChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverChannel.socket();
- serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
- serverSocket.setReuseAddress(true);
- stopped = false;
- connectionListener = new ConnectionListenerThread();
- dataListener = new DataListenerThread();
- networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
-
- }
-
- public void setPartitionRequestListener(IPartitionRequestListener partitionRequestListener) {
- this.partitionRequestListener = partitionRequestListener;
- }
-
- public void start() {
- connectionListener.start();
- dataListener.start();
- }
-
- public void stop() {
- try {
- stopped = true;
- serverChannel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void connect(INetworkChannel channel) throws IOException {
- dataListener.addOutgoingConnection(channel);
- }
-
- private final class ConnectionListenerThread extends Thread {
- public ConnectionListenerThread() {
- super("Hyracks NC Connection Listener");
- setDaemon(true);
- setPriority(MAX_PRIORITY);
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- SocketChannel sc = serverChannel.accept();
- dataListener.addIncomingConnection(sc);
- } catch (AsynchronousCloseException e) {
- // do nothing
- if (!stopped) {
- e.printStackTrace();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private final class DataListenerThread extends Thread {
- private Selector selector;
-
- private final List<SocketChannel> pendingIncomingConnections;
- private final Set<SocketChannel> pendingNegotiations;
- private final List<INetworkChannel> pendingOutgoingConnections;
-
- public DataListenerThread() {
- super("Hyracks Data Listener Thread");
- setDaemon(true);
- try {
- selector = Selector.open();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- pendingIncomingConnections = new ArrayList<SocketChannel>();
- pendingNegotiations = new HashSet<SocketChannel>();
- pendingOutgoingConnections = new ArrayList<INetworkChannel>();
- }
-
- synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
- pendingIncomingConnections.add(sc);
- selector.wakeup();
- }
-
- synchronized void addOutgoingConnection(INetworkChannel channel) throws IOException {
- pendingOutgoingConnections.add(channel);
- selector.wakeup();
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting Select");
- }
- int n = selector.select();
- synchronized (this) {
- if (!pendingIncomingConnections.isEmpty()) {
- for (SocketChannel sc : pendingIncomingConnections) {
- sc.configureBlocking(false);
- sc.socket().setReuseAddress(true);
- SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
- ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
- scKey.attach(buffer);
- pendingNegotiations.add(sc);
- }
- pendingIncomingConnections.clear();
- }
- if (!pendingOutgoingConnections.isEmpty()) {
- for (INetworkChannel nc : pendingOutgoingConnections) {
- SocketChannel sc = SocketChannel.open();
- sc.configureBlocking(false);
- sc.socket().setReuseAddress(true);
- SelectionKey scKey = sc.register(selector, 0);
- scKey.attach(nc);
- nc.setSelectionKey(scKey);
- nc.notifyConnectionManagerRegistration();
- }
- pendingOutgoingConnections.clear();
- }
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Selector: " + n);
- }
- if (n > 0) {
- for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
- SelectionKey key = i.next();
- i.remove();
- SocketChannel sc = (SocketChannel) key.channel();
- if (pendingNegotiations.contains(sc)) {
- if (key.isReadable()) {
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- sc.read(buffer);
- buffer.flip();
- if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
- PartitionId pid = readInitialMessage(buffer);
- pendingNegotiations.remove(sc);
- key.interestOps(0);
- NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
- channel.setSelectionKey(key);
- key.attach(channel);
- try {
- partitionRequestListener.registerPartitionRequest(pid, channel);
- } catch (HyracksException e) {
- key.cancel();
- sc.close();
- channel.abort();
- }
- } else {
- buffer.compact();
- }
- }
- } else {
- INetworkChannel channel = (INetworkChannel) key.attachment();
- boolean close = false;
- boolean error = false;
- try {
- close = channel.dispatchNetworkEvent();
- } catch (IOException e) {
- e.printStackTrace();
- error = true;
- }
- if (close || error) {
- key.cancel();
- sc.close();
- if (error) {
- channel.abort();
- }
- }
- }
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- private PartitionId readInitialMessage(ByteBuffer buffer) {
- JobId jobId = new JobId(buffer.getLong());
- ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
- int senderIndex = buffer.getInt();
- int receiverIndex = buffer.getInt();
- return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
- }
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 61cd91f..1f4f070 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
-import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
public interface INetworkChannel {
@@ -9,10 +8,6 @@
public void setSelectionKey(SelectionKey key);
- public SelectionKey getSelectionKey();
-
- public SocketAddress getRemoteAddress();
-
public void abort();
public void notifyConnectionManagerRegistration() throws IOException;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 23cf514..6400288 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -14,65 +14,48 @@
*/
package edu.uci.ics.hyracks.control.nc.net;
-import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-public class NetworkInputChannel implements IInputChannel, INetworkChannel {
- private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+public class NetworkInputChannel implements IInputChannel {
+ private IHyracksRootContext ctx;
- private final ConnectionManager connectionManager;
+ private final NetworkManager netManager;
private final SocketAddress remoteAddress;
private final PartitionId partitionId;
- private final Queue<ByteBuffer> emptyQueue;
-
private final Queue<ByteBuffer> fullQueue;
- private SocketChannel socketChannel;
+ private final int nBuffers;
- private SelectionKey key;
-
- private ByteBuffer currentBuffer;
-
- private boolean eos;
-
- private boolean aborted;
+ private ChannelControlBlock ccb;
private IInputChannelMonitor monitor;
private Object attachment;
- private ByteBuffer writeBuffer;
-
- public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
- SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
- this.connectionManager = connectionManager;
+ public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
+ PartitionId partitionId, int nBuffers) {
+ this.ctx = ctx;
+ this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
- this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- for (int i = 0; i < nBuffers; ++i) {
- emptyQueue.add(ctx.allocateFrame());
- }
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- aborted = false;
- eos = false;
+ this.nBuffers = nBuffers;
}
@Override
@@ -96,29 +79,31 @@
}
@Override
- public synchronized void recycleBuffer(ByteBuffer buffer) {
+ public void recycleBuffer(ByteBuffer buffer) {
buffer.clear();
- emptyQueue.add(buffer);
- if (!eos && !aborted) {
- int ops = key.interestOps();
- if ((ops & SelectionKey.OP_READ) == 0) {
- key.interestOps(ops | SelectionKey.OP_READ);
- key.selector().wakeup();
- if (currentBuffer == null) {
- currentBuffer = emptyQueue.poll();
- }
- }
- }
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
}
@Override
public void open() throws HyracksDataException {
- currentBuffer = emptyQueue.poll();
try {
- connectionManager.connect(this);
- } catch (IOException e) {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
}
@Override
@@ -126,110 +111,28 @@
}
- @Override
- public synchronized boolean dispatchNetworkEvent() throws IOException {
- if (aborted) {
- eos = true;
- monitor.notifyFailure(this);
- return true;
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
}
- if (key.isConnectable()) {
- if (socketChannel.finishConnect()) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
- prepareForWrite();
- }
- } else if (key.isWritable()) {
- socketChannel.write(writeBuffer);
- if (writeBuffer.remaining() == 0) {
- key.interestOps(SelectionKey.OP_READ);
- }
- } else if (key.isReadable()) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
- }
- int bytesRead = socketChannel.read(currentBuffer);
- if (bytesRead < 0) {
- eos = true;
- monitor.notifyEndOfStream(this);
- return true;
- }
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After read: " + currentBuffer.position() + " " + currentBuffer.limit());
- }
- currentBuffer.flip();
- int dataLen = currentBuffer.remaining();
- if (dataLen >= currentBuffer.capacity() || aborted()) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
- }
- if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
- eos = true;
- monitor.notifyEndOfStream(this);
- return true;
- }
- fullQueue.add(currentBuffer);
- currentBuffer = emptyQueue.poll();
- if (currentBuffer == null && key.isValid()) {
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_READ);
- }
- monitor.notifyDataAvailability(this, 1);
- return false;
- }
- currentBuffer.compact();
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
}
- return false;
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
}
- private void prepareForConnect() {
- key.interestOps(SelectionKey.OP_CONNECT);
- }
-
- private void prepareForWrite() {
- writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
- writeBuffer.putLong(partitionId.getJobId().getId());
- writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
- writeBuffer.putInt(partitionId.getSenderIndex());
- writeBuffer.putInt(partitionId.getReceiverIndex());
- writeBuffer.flip();
-
- key.interestOps(SelectionKey.OP_WRITE);
- }
-
- @Override
- public void setSelectionKey(SelectionKey key) {
- this.key = key;
- socketChannel = (SocketChannel) key.channel();
- }
-
- @Override
- public SocketAddress getRemoteAddress() {
- return remoteAddress;
- }
-
- @Override
- public SelectionKey getSelectionKey() {
- return key;
- }
-
- public PartitionId getPartitionId() {
- return partitionId;
- }
-
- public void abort() {
- aborted = true;
- }
-
- public boolean aborted() {
- return aborted;
- }
-
- @Override
- public void notifyConnectionManagerRegistration() throws IOException {
- if (socketChannel.connect(remoteAddress)) {
- prepareForWrite();
- } else {
- prepareForConnect();
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..c47db54
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetworkManager {
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPartitionRequestListener partitionRequestListener;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
+ IPartitionRequestListener partitionRequestListener) throws IOException {
+ this.ctx = ctx;
+ this.partitionRequestListener = partitionRequestListener;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ PartitionId pid = readInitialMessage(buffer);
+ noc = new NetworkOutputChannel(ctx, ccb, 5);
+ try {
+ partitionRequestListener.registerPartitionRequest(pid, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ private static PartitionId readInitialMessage(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+ int senderIndex = buffer.getInt();
+ int receiverIndex = buffer.getInt();
+ return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 31ce924..1abcdf6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -14,124 +14,44 @@
*/
package edu.uci.ics.hyracks.control.nc.net;
-import java.io.IOException;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-public class NetworkOutputChannel implements INetworkChannel, IFrameWriter {
- private final IHyracksRootContext ctx;
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
private final Queue<ByteBuffer> emptyQueue;
- private final Queue<ByteBuffer> fullQueue;
-
- private SelectionKey key;
-
private boolean aborted;
- private boolean eos;
-
- private boolean eosSent;
-
- private boolean failed;
-
- private ByteBuffer currentBuffer;
-
- public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
- this.ctx = ctx;
+ public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
- fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- }
-
- @Override
- public synchronized boolean dispatchNetworkEvent() throws IOException {
- if (failed || aborted) {
- eos = true;
- return true;
- } else if (key.isWritable()) {
- while (true) {
- if (currentBuffer == null) {
- if (eosSent) {
- return true;
- }
- currentBuffer = fullQueue.poll();
- if (currentBuffer == null) {
- if (eos) {
- currentBuffer = emptyQueue.poll();
- currentBuffer.clear();
- currentBuffer.putInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()), 0);
- eosSent = true;
- } else {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- return false;
- }
- }
- }
- int bytesWritten = ((SocketChannel) key.channel()).write(currentBuffer);
- if (bytesWritten < 0) {
- eos = true;
- return true;
- }
- if (currentBuffer.remaining() == 0) {
- emptyQueue.add(currentBuffer);
- notifyAll();
- currentBuffer = null;
- if (eosSent) {
- return true;
- }
- } else {
- return false;
- }
- }
- }
- return false;
- }
-
- @Override
- public void setSelectionKey(SelectionKey key) {
- this.key = key;
- }
-
- @Override
- public SelectionKey getSelectionKey() {
- return key;
- }
-
- @Override
- public SocketAddress getRemoteAddress() {
- return ((SocketChannel) key.channel()).socket().getRemoteSocketAddress();
- }
-
- @Override
- public synchronized void abort() {
- aborted = true;
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@Override
public void open() throws HyracksDataException {
- currentBuffer = null;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer destBuffer = null;
synchronized (this) {
- if (aborted) {
- throw new HyracksDataException("Connection has been aborted");
- }
while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
destBuffer = emptyQueue.poll();
if (destBuffer != null) {
break;
@@ -148,26 +68,34 @@
destBuffer.clear();
destBuffer.put(buffer);
destBuffer.flip();
- synchronized (this) {
- fullQueue.add(destBuffer);
- }
- key.interestOps(SelectionKey.OP_WRITE);
- key.selector().wakeup();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
}
@Override
public void fail() throws HyracksDataException {
- failed = true;
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
}
@Override
- public synchronized void close() throws HyracksDataException {
- eos = true;
- key.interestOps(SelectionKey.OP_WRITE);
- key.selector().wakeup();
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
}
- @Override
- public void notifyConnectionManagerRegistration() throws IOException {
+ void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyQueue.add(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bfa21c9..bd40fea 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -48,7 +48,7 @@
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+ ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
networkAddress.getPort()), pid, 1));
ji.reportPartitionAvailability(channel);
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index cbbe718..3fb4d60 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -59,6 +59,7 @@
this.system = system;
this.networkThread = new NetworkThread();
this.serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
ServerSocket socket = serverSocketChannel.socket();
socket.bind(socketAddress);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
similarity index 98%
rename from hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
rename to hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 1fc1f6f..24cea1d 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.ipc.tests;
+package edu.uci.ics.hyracks.ipc.tests;
import java.io.IOException;
import java.net.InetSocketAddress;
diff --git a/hyracks-net/pom.xml b/hyracks-net/pom.xml
new file mode 100644
index 0000000..12004f7
--- /dev/null
+++ b/hyracks-net/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-net</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
new file mode 100644
index 0000000..2f27bf0
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+import java.nio.ByteBuffer;
+
+public interface IBufferAcceptor {
+ public void accept(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
new file mode 100644
index 0000000..c395ac9
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.buffers;
+
+public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+ public void close();
+
+ public void error(int ecode);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
new file mode 100644
index 0000000..ecd0373
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/exceptions/NetException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.exceptions;
+
+public class NetException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NetException() {
+ }
+
+ public NetException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NetException(String message) {
+ super(message);
+ }
+
+ public NetException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
new file mode 100644
index 0000000..9a7a38b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public class ChannelControlBlock {
+ private final ChannelSet cSet;
+
+ private final int channelId;
+
+ private final ReadInterface ri;
+
+ private final WriteInterface wi;
+
+ private final AtomicBoolean localClose;
+
+ private final AtomicBoolean remoteClose;
+
+ ChannelControlBlock(ChannelSet cSet, int channelId) {
+ this.cSet = cSet;
+ this.channelId = channelId;
+ this.ri = new ReadInterface();
+ this.wi = new WriteInterface();
+ localClose = new AtomicBoolean();
+ remoteClose = new AtomicBoolean();
+ }
+
+ int getChannelId() {
+ return channelId;
+ }
+
+ public IChannelReadInterface getReadInterface() {
+ return ri;
+ }
+
+ public IChannelWriteInterface getWriteInterface() {
+ return wi;
+ }
+
+ private final class ReadInterface implements IChannelReadInterface {
+ private final Queue<ByteBuffer> riEmptyQueue;
+
+ private final IBufferAcceptor eba = new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ int delta;
+ synchronized (ChannelControlBlock.this) {
+ riEmptyQueue.add(buffer);
+ delta = buffer.remaining();
+ }
+ credits.addAndGet(delta);
+ if (delta != 0) {
+ cSet.markPendingCredits(channelId);
+ }
+ }
+ };
+
+ private ICloseableBufferAcceptor fba;
+
+ private final AtomicInteger credits;
+
+ private ByteBuffer currentReadBuffer;
+
+ ReadInterface() {
+ riEmptyQueue = new LinkedList<ByteBuffer>();
+ credits = new AtomicInteger();
+ }
+
+ @Override
+ public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
+ fba = fullBufferAcceptor;
+ }
+
+ @Override
+ public IBufferAcceptor getEmptyBufferAcceptor() {
+ return eba;
+ }
+
+ int read(SocketChannel sc, int size) throws IOException {
+ while (true) {
+ if (size <= 0) {
+ return size;
+ }
+ if (ri.currentReadBuffer == null) {
+ ri.currentReadBuffer = ri.riEmptyQueue.poll();
+ assert ri.currentReadBuffer != null;
+ }
+ int rSize = Math.min(size, ri.currentReadBuffer.remaining());
+ if (rSize > 0) {
+ ri.currentReadBuffer.limit(ri.currentReadBuffer.position() + rSize);
+ int len;
+ try {
+ len = sc.read(ri.currentReadBuffer);
+ } finally {
+ ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
+ }
+ size -= len;
+ if (len < rSize) {
+ return size;
+ }
+ } else {
+ return size;
+ }
+ if (ri.currentReadBuffer.remaining() <= 0) {
+ flush();
+ }
+ }
+ }
+
+ void flush() {
+ if (currentReadBuffer != null) {
+ currentReadBuffer.flip();
+ fba.accept(ri.currentReadBuffer);
+ currentReadBuffer = null;
+ }
+ }
+ }
+
+ private final class WriteInterface implements IChannelWriteInterface {
+ private final Queue<ByteBuffer> wiFullQueue;
+
+ private int channelWriteEventCount;
+
+ private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (ChannelControlBlock.this) {
+ wiFullQueue.add(buffer);
+ incrementLocalWriteEventCount();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (ChannelControlBlock.this) {
+ if (eos) {
+ return;
+ }
+ eos = true;
+ incrementLocalWriteEventCount();
+ }
+ }
+
+ @Override
+ public void error(int ecode) {
+ synchronized (ChannelControlBlock.this) {
+ WriteInterface.this.ecode = ecode;
+ incrementLocalWriteEventCount();
+ }
+ }
+ };
+
+ private IBufferAcceptor eba;
+
+ private final AtomicInteger credits;
+
+ private boolean eos;
+
+ private boolean eosSent;
+
+ private int ecode;
+
+ private boolean ecodeSent;
+
+ private ByteBuffer currentWriteBuffer;
+
+ WriteInterface() {
+ wiFullQueue = new LinkedList<ByteBuffer>();
+ credits = new AtomicInteger();
+ eos = false;
+ eosSent = false;
+ ecode = -1;
+ ecodeSent = false;
+ }
+
+ @Override
+ public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
+ eba = emptyBufferAcceptor;
+ }
+
+ @Override
+ public ICloseableBufferAcceptor getFullBufferAcceptor() {
+ return fba;
+ }
+
+ void write(MultiplexedConnection.WriterState writerState) {
+ if (currentWriteBuffer == null) {
+ currentWriteBuffer = wiFullQueue.poll();
+ }
+ if (currentWriteBuffer != null) {
+ int size = Math.min(currentWriteBuffer.remaining(), credits.get());
+ if (size > 0) {
+ credits.addAndGet(-size);
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
+ writerState.command.setData(size);
+ writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+ }
+ } else if (ecode >= 0 && !ecodeSent) {
+ decrementLocalWriteEventCount();
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
+ writerState.command.setData(ecode);
+ writerState.reset(null, 0, null);
+ ecodeSent = true;
+ localClose.set(true);
+ } else if (wi.eos && !wi.eosSent) {
+ decrementLocalWriteEventCount();
+ writerState.command.setChannelId(channelId);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ eosSent = true;
+ localClose.set(true);
+ }
+ }
+
+ void writeComplete() {
+ if (currentWriteBuffer.remaining() <= 0) {
+ currentWriteBuffer.clear();
+ eba.accept(currentWriteBuffer);
+ decrementLocalWriteEventCount();
+ currentWriteBuffer = null;
+ }
+ }
+
+ void incrementLocalWriteEventCount() {
+ ++channelWriteEventCount;
+ if (channelWriteEventCount == 1) {
+ cSet.markPendingWrite(channelId);
+ }
+ }
+
+ void decrementLocalWriteEventCount() {
+ --channelWriteEventCount;
+ if (channelWriteEventCount == 0) {
+ cSet.unmarkPendingWrite(channelId);
+ }
+ }
+ }
+
+ synchronized void write(MultiplexedConnection.WriterState writerState) {
+ wi.write(writerState);
+ }
+
+ synchronized void writeComplete() {
+ wi.writeComplete();
+ }
+
+ synchronized int read(SocketChannel sc, int size) throws IOException {
+ return ri.read(sc, size);
+ }
+
+ void addReadCredits(int delta) {
+ ri.credits.addAndGet(delta);
+ }
+
+ int getAndResetReadCredits() {
+ return ri.credits.getAndSet(0);
+ }
+
+ void addWriteCredits(int delta) {
+ wi.credits.addAndGet(delta);
+ }
+
+ synchronized void reportRemoteEOS() {
+ ri.flush();
+ ri.fba.close();
+ remoteClose.set(true);
+ }
+
+ synchronized void reportRemoteError(int ecode) {
+ ri.flush();
+ ri.fba.error(ecode);
+ remoteClose.set(true);
+ }
+
+ boolean completelyClosed() {
+ return localClose.get() && remoteClose.get();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
new file mode 100644
index 0000000..48f008b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+public class ChannelSet {
+ private static final int INITIAL_SIZE = 16;
+
+ private final MultiplexedConnection mConn;
+
+ private ChannelControlBlock[] ccbArray;
+
+ private final BitSet allocationBitmap;
+
+ private final BitSet pendingChannelWriteBitmap;
+
+ private final BitSet pendingChannelCreditsBitmap;
+
+ private final BitSet pendingChannelSynBitmap;
+
+ private int openChannelCount;
+
+ private final IEventCounter pendingWriteEventsCounter;
+
+ ChannelSet(MultiplexedConnection mConn, IEventCounter pendingWriteEventsCounter) {
+ this.mConn = mConn;
+ ccbArray = new ChannelControlBlock[INITIAL_SIZE];
+ allocationBitmap = new BitSet();
+ pendingChannelWriteBitmap = new BitSet();
+ pendingChannelCreditsBitmap = new BitSet();
+ pendingChannelSynBitmap = new BitSet();
+ this.pendingWriteEventsCounter = pendingWriteEventsCounter;
+ openChannelCount = 0;
+ }
+
+ ChannelControlBlock allocateChannel() {
+ synchronized (mConn) {
+ int idx = allocationBitmap.nextClearBit(0);
+ if (idx < 0) {
+ cleanupClosedChannels();
+ idx = allocationBitmap.nextClearBit(0);
+ if (idx < 0) {
+ idx = ccbArray.length;
+ }
+ }
+ return createChannel(idx);
+ }
+ }
+
+ private void cleanupClosedChannels() {
+ for (int i = 0; i < ccbArray.length; ++i) {
+ ChannelControlBlock ccb = ccbArray[i];
+ if (ccb != null) {
+ if (ccb.completelyClosed()) {
+ freeChannel(ccb);
+ }
+ }
+ }
+ }
+
+ ChannelControlBlock registerChannel(int channelId) {
+ return createChannel(channelId);
+ }
+
+ private void freeChannel(ChannelControlBlock channel) {
+ int idx = channel.getChannelId();
+ ccbArray[idx] = null;
+ allocationBitmap.clear(idx);
+ --openChannelCount;
+ }
+
+ ChannelControlBlock getCCB(int channelId) {
+ return ccbArray[channelId];
+ }
+
+ BitSet getPendingChannelWriteBitmap() {
+ return pendingChannelWriteBitmap;
+ }
+
+ BitSet getPendingChannelCreditsBitmap() {
+ return pendingChannelCreditsBitmap;
+ }
+
+ BitSet getPendingChannelSynBitmap() {
+ return pendingChannelSynBitmap;
+ }
+
+ int getOpenChannelCount() {
+ return openChannelCount;
+ }
+
+ void initiateChannelSyn(int channelId) {
+ synchronized (mConn) {
+ assert !pendingChannelSynBitmap.get(channelId);
+ pendingChannelSynBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
+ void markPendingCredits(int channelId) {
+ synchronized (mConn) {
+ if (!pendingChannelCreditsBitmap.get(channelId)) {
+ pendingChannelCreditsBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+ }
+
+ void markPendingWrite(int channelId) {
+ synchronized (mConn) {
+ assert !pendingChannelWriteBitmap.get(channelId);
+ pendingChannelWriteBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
+ public void unmarkPendingWrite(int channelId) {
+ synchronized (mConn) {
+ assert pendingChannelWriteBitmap.get(channelId);
+ pendingChannelWriteBitmap.clear(channelId);
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+
+ private ChannelControlBlock createChannel(int idx) {
+ if (idx >= ccbArray.length) {
+ expand(idx);
+ }
+ assert idx < ccbArray.length;
+ assert !allocationBitmap.get(idx);
+ ChannelControlBlock channel = new ChannelControlBlock(this, idx);
+ ccbArray[idx] = channel;
+ allocationBitmap.set(idx);
+ ++openChannelCount;
+ return channel;
+ }
+
+ private void expand(int idx) {
+ while (idx >= ccbArray.length) {
+ ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
new file mode 100644
index 0000000..0fc9b2a
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IChannelOpenListener {
+ public void channelOpened(ChannelControlBlock channel);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
new file mode 100644
index 0000000..468a617
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelReadInterface {
+ public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
+
+ public IBufferAcceptor getEmptyBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
new file mode 100644
index 0000000..1e53d71
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+
+public interface IChannelWriteInterface {
+ public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+
+ public ICloseableBufferAcceptor getFullBufferAcceptor();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
new file mode 100644
index 0000000..148078c
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IEventCounter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+public interface IEventCounter {
+ public void increment();
+
+ public void decrement();
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
new file mode 100644
index 0000000..a99cb6a
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+
+public class MultiplexedConnection implements ITCPConnectionEventListener {
+ private final MuxDemux muxDemux;
+
+ private final IEventCounter pendingWriteEventsCounter;
+
+ private final ChannelSet cSet;
+
+ private final ReaderState readerState;
+
+ private final WriterState writerState;
+
+ private TCPConnection tcpConnection;
+
+ private int lastChannelWritten;
+
+ public MultiplexedConnection(MuxDemux muxDemux) {
+ this.muxDemux = muxDemux;
+ pendingWriteEventsCounter = new IEventCounter() {
+ private int counter;
+
+ @Override
+ public synchronized void increment() {
+ ++counter;
+ if (counter == 1) {
+ tcpConnection.enable(SelectionKey.OP_WRITE);
+ }
+ }
+
+ @Override
+ public synchronized void decrement() {
+ --counter;
+ if (counter == 0) {
+ tcpConnection.disable(SelectionKey.OP_WRITE);
+ }
+ if (counter < 0) {
+ throw new IllegalStateException();
+ }
+ }
+ };
+ cSet = new ChannelSet(this, pendingWriteEventsCounter);
+ readerState = new ReaderState();
+ writerState = new WriterState();
+ lastChannelWritten = -1;
+ }
+
+ synchronized void setTCPConnection(TCPConnection tcpConnection) {
+ this.tcpConnection = tcpConnection;
+ tcpConnection.enable(SelectionKey.OP_READ);
+ notifyAll();
+ }
+
+ synchronized void waitUntilConnected() throws InterruptedException {
+ while (tcpConnection == null) {
+ wait();
+ }
+ }
+
+ @Override
+ public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException {
+ if (readable) {
+ driveReaderStateMachine();
+ }
+ if (writable) {
+ driveWriterStateMachine();
+ }
+ }
+
+ public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+ ChannelControlBlock channel = cSet.allocateChannel();
+ int channelId = channel.getChannelId();
+ cSet.initiateChannelSyn(channelId);
+ return channel;
+ }
+
+ class WriterState {
+ private final ByteBuffer writeBuffer;
+
+ final MuxDemuxCommand command;
+
+ private ByteBuffer pendingBuffer;
+
+ private int pendingWriteSize;
+
+ private ChannelControlBlock ccb;
+
+ public WriterState() {
+ writeBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+ writeBuffer.flip();
+ command = new MuxDemuxCommand();
+ ccb = null;
+ }
+
+ boolean writePending() {
+ return writeBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+ }
+
+ void reset(ByteBuffer pendingBuffer, int pendingWriteSize, ChannelControlBlock ccb) {
+ writeBuffer.clear();
+ command.write(writeBuffer);
+ writeBuffer.flip();
+ this.pendingBuffer = pendingBuffer;
+ this.pendingWriteSize = pendingWriteSize;
+ this.ccb = ccb;
+ }
+
+ boolean performPendingWrite(SocketChannel sc) throws IOException {
+ int len = writeBuffer.remaining();
+ if (len > 0) {
+ int written = sc.write(writeBuffer);
+ if (written < len) {
+ return false;
+ }
+ }
+ if (pendingBuffer != null) {
+ if (pendingWriteSize > 0) {
+ assert pendingWriteSize <= pendingBuffer.remaining();
+ int oldLimit = pendingBuffer.limit();
+ try {
+ pendingBuffer.limit(pendingWriteSize);
+ int written = sc.write(pendingBuffer);
+ pendingWriteSize -= written;
+ } finally {
+ pendingBuffer.limit(oldLimit);
+ }
+ }
+ if (pendingWriteSize > 0) {
+ return false;
+ }
+ pendingBuffer = null;
+ pendingWriteSize = 0;
+ }
+ if (ccb != null) {
+ ccb.writeComplete();
+ ccb = null;
+ }
+ return true;
+ }
+ }
+
+ void driveWriterStateMachine() throws IOException {
+ SocketChannel sc = tcpConnection.getSocketChannel();
+ if (writerState.writePending()) {
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ }
+ int numCycles;
+
+ synchronized (MultiplexedConnection.this) {
+ numCycles = cSet.getOpenChannelCount();
+ }
+
+ for (int i = 0; i < numCycles; ++i) {
+ ChannelControlBlock writeCCB = null;
+ synchronized (MultiplexedConnection.this) {
+ BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
+ for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
+ pendingChannelSynBitmap.clear(j);
+ pendingWriteEventsCounter.decrement();
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ }
+ BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
+ for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
+ .nextSetBit(j)) {
+ pendingChannelCreditsBitmap.clear(j);
+ pendingWriteEventsCounter.decrement();
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
+ ChannelControlBlock ccb = cSet.getCCB(j);
+ int credits = ccb.getAndResetReadCredits();
+ writerState.command.setData(credits);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ }
+ BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
+ lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
+ if (lastChannelWritten < 0) {
+ lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(0);
+ if (lastChannelWritten < 0) {
+ return;
+ }
+ }
+ writeCCB = cSet.getCCB(lastChannelWritten);
+ }
+ writeCCB.write(writerState);
+ if (writerState.writePending()) {
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ }
+ }
+ }
+
+ class ReaderState {
+ private final ByteBuffer readBuffer;
+
+ final MuxDemuxCommand command;
+
+ private int pendingReadSize;
+
+ private ChannelControlBlock ccb;
+
+ ReaderState() {
+ readBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+ command = new MuxDemuxCommand();
+ }
+
+ void reset() {
+ readBuffer.clear();
+ pendingReadSize = 0;
+ ccb = null;
+ }
+ }
+
+ void driveReaderStateMachine() throws IOException {
+ SocketChannel sc = tcpConnection.getSocketChannel();
+ if (readerState.readBuffer.remaining() > 0) {
+ sc.read(readerState.readBuffer);
+ if (readerState.readBuffer.remaining() > 0) {
+ return;
+ }
+ readerState.readBuffer.flip();
+ readerState.command.read(readerState.readBuffer);
+ switch (readerState.command.getCommandType()) {
+ case ADD_CREDITS: {
+ ChannelControlBlock ccb;
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.addWriteCredits(readerState.command.getData());
+ break;
+ }
+ case CLOSE_CHANNEL: {
+ ChannelControlBlock ccb;
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportRemoteEOS();
+ break;
+ }
+ case DATA: {
+ ChannelControlBlock ccb;
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ readerState.pendingReadSize = readerState.command.getData();
+ readerState.ccb = ccb;
+ break;
+ }
+ case ERROR: {
+ ChannelControlBlock ccb;
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportRemoteError(readerState.command.getData());
+ break;
+ }
+ case OPEN_CHANNEL: {
+ int channelId = readerState.command.getChannelId();
+ ChannelControlBlock ccb;
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.registerChannel(channelId);
+ }
+ muxDemux.getChannelOpenListener().channelOpened(ccb);
+ }
+ }
+ }
+ if (readerState.pendingReadSize > 0) {
+ readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+ if (readerState.pendingReadSize > 0) {
+ return;
+ }
+ }
+ readerState.reset();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
new file mode 100644
index 0000000..74f1be2
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
+
+public class MuxDemux {
+ private final InetSocketAddress localAddress;
+
+ private final IChannelOpenListener channelOpenListener;
+
+ private final Map<InetSocketAddress, MultiplexedConnection> connectionMap;
+
+ private final TCPEndpoint tcpEndpoint;
+
+ public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
+ this.localAddress = localAddress;
+ this.channelOpenListener = listener;
+ connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+ this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+ @Override
+ public void connectionEstablished(TCPConnection connection) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(connection.getRemoteAddress());
+ }
+ assert mConn != null;
+ mConn.setTCPConnection(connection);
+ connection.setEventListener(mConn);
+ connection.setAttachment(mConn);
+ }
+
+ @Override
+ public void acceptedConnection(TCPConnection connection) {
+ MultiplexedConnection mConn = new MultiplexedConnection(MuxDemux.this);
+ mConn.setTCPConnection(connection);
+ connection.setEventListener(mConn);
+ connection.setAttachment(mConn);
+ }
+ });
+ }
+
+ public void start() throws IOException {
+ tcpEndpoint.start(localAddress);
+ }
+
+ public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException {
+ MultiplexedConnection mConn = null;
+ synchronized (this) {
+ mConn = connectionMap.get(remoteAddress);
+ if (mConn == null) {
+ mConn = new MultiplexedConnection(this);
+ connectionMap.put(remoteAddress, mConn);
+ tcpEndpoint.initiateConnection(remoteAddress);
+ }
+ }
+ mConn.waitUntilConnected();
+ return mConn;
+ }
+
+ IChannelOpenListener getChannelOpenListener() {
+ return channelOpenListener;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return tcpEndpoint.getLocalAddress();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
new file mode 100644
index 0000000..c32214c
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+class MuxDemuxCommand {
+ static final int COMMAND_SIZE = 4;
+
+ enum CommandType {
+ OPEN_CHANNEL,
+ CLOSE_CHANNEL,
+ ERROR,
+ ADD_CREDITS,
+ DATA,
+ }
+
+ private int channelId;
+
+ private CommandType type;
+
+ private int data;
+
+ public int getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(int channelId) {
+ this.channelId = channelId;
+ }
+
+ public CommandType getCommandType() {
+ return type;
+ }
+
+ public void setCommandType(CommandType type) {
+ this.type = type;
+ }
+
+ public int getData() {
+ return data;
+ }
+
+ public void setData(int data) {
+ this.data = data;
+ }
+
+ public void write(ByteBuffer buffer) {
+ int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
+ buffer.putInt(cmd);
+ }
+
+ public void read(ByteBuffer buffer) {
+ int cmd = buffer.getInt();
+ channelId = (cmd >> 22) & 0x3ff;
+ type = CommandType.values()[(cmd >> 19) & 0x7];
+ data = cmd & 0x7ffff;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
new file mode 100644
index 0000000..4d894bd
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+
+public interface ITCPConnectionEventListener {
+ public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException;
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
new file mode 100644
index 0000000..cdaabf4
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+public interface ITCPConnectionListener {
+ public void acceptedConnection(TCPConnection connection);
+
+ public void connectionEstablished(TCPConnection connection);
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
new file mode 100644
index 0000000..210508b
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+public class TCPConnection {
+ private final TCPEndpoint endpoint;
+
+ private final SocketChannel channel;
+
+ private final SelectionKey key;
+
+ private final Selector selector;
+
+ private ITCPConnectionEventListener eventListener;
+
+ private Object attachment;
+
+ public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+ this.endpoint = endpoint;
+ this.channel = channel;
+ this.key = key;
+ this.selector = selector;
+ }
+
+ public TCPEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public SocketChannel getSocketChannel() {
+ return channel;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+ }
+
+ public void enable(int ops) {
+ key.interestOps(key.interestOps() | ops);
+ selector.wakeup();
+ }
+
+ public void disable(int ops) {
+ key.interestOps(key.interestOps() & ~(ops));
+ selector.wakeup();
+ }
+
+ public ITCPConnectionEventListener getEventListener() {
+ return eventListener;
+ }
+
+ public void setEventListener(ITCPConnectionEventListener eventListener) {
+ this.eventListener = eventListener;
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ public void close() {
+ key.cancel();
+ try {
+ channel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
new file mode 100644
index 0000000..5f92a11
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TCPEndpoint {
+ private final ITCPConnectionListener connectionListener;
+
+ private final List<InetSocketAddress>[] pendingConnections;
+
+ private ServerSocketChannel serverSocketChannel;
+
+ private InetSocketAddress localAddress;
+
+ private Selector selector;
+
+ private IOThread ioThread;
+
+ private int writerIndex;
+
+ private int readerIndex;
+
+ public TCPEndpoint(ITCPConnectionListener connectionListener) {
+ this.connectionListener = connectionListener;
+ this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(), new ArrayList<InetSocketAddress>() };
+ writerIndex = 0;
+ readerIndex = 1;
+ }
+
+ public void start(InetSocketAddress localAddress) throws IOException {
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(localAddress);
+ this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+ serverSocketChannel.configureBlocking(false);
+ selector = Selector.open();
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ ioThread = new IOThread();
+ ioThread.start();
+ }
+
+ public synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+ pendingConnections[writerIndex].add(remoteAddress);
+ selector.wakeup();
+ }
+
+ private synchronized void swapReadersAndWriters() {
+ int temp = readerIndex;
+ readerIndex = writerIndex;
+ writerIndex = temp;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return localAddress;
+ }
+
+ private class IOThread extends Thread {
+ public IOThread() {
+ super("TCPEndpoint IO Thread");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ int n = selector.select();
+ swapReadersAndWriters();
+ if (!pendingConnections[readerIndex].isEmpty()) {
+ for (InetSocketAddress address : pendingConnections[readerIndex]) {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ if (!channel.connect(address)) {
+ channel.register(selector, SelectionKey.OP_CONNECT);
+ } else {
+ SelectionKey key = channel.register(selector, 0);
+ createConnection(key, channel);
+ }
+ }
+ pendingConnections[readerIndex].clear();
+ }
+ if (n > 0) {
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+ while (i.hasNext()) {
+ SelectionKey key = i.next();
+ i.remove();
+ SelectableChannel sc = key.channel();
+ boolean readable = key.isReadable();
+ boolean writable = key.isWritable();
+
+ if (readable || writable) {
+ TCPConnection connection = (TCPConnection) key.attachment();
+ connection.getEventListener().notifyIOReady(connection, readable, writable);
+ }
+ if (key.isAcceptable()) {
+ assert sc == serverSocketChannel;
+ SocketChannel channel = serverSocketChannel.accept();
+ channel.configureBlocking(false);
+ SelectionKey sKey = channel.register(selector, 0);
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+ sKey.attach(connection);
+ connectionListener.acceptedConnection(connection);
+ } else if (key.isConnectable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ if (channel.finishConnect()) {
+ createConnection(key, channel);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void createConnection(SelectionKey key, SocketChannel channel) {
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+ key.attach(connection);
+ key.interestOps(0);
+ connectionListener.connectionEstablished(connection);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
new file mode 100644
index 0000000..6052443
--- /dev/null
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.tests;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetTest {
+ @Test
+ public void test() throws Exception {
+ MuxDemux md1 = createMuxDemux("md1");
+ md1.start();
+ InetSocketAddress md1Address = md1.getLocalAddress();
+ System.err.println("md1Address: " + md1Address);
+
+ MuxDemux md2 = createMuxDemux("md2");
+ md2.start();
+ InetSocketAddress md2Address = md2.getLocalAddress();
+ System.err.println("md2Address: " + md2Address);
+
+ System.err.println("Started");
+
+ MultiplexedConnection md1md2 = md1.connect(md2Address);
+
+ Thread t1 = createThread(md1md2, 1);
+ Thread t2 = createThread(md1md2, -1);
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+ }
+
+ private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
+ return new Thread() {
+ @Override
+ public void run() {
+ try {
+ ChannelControlBlock md1md2c1 = md1md2.openChannel();
+
+ final Semaphore sem = new Semaphore(1);
+ sem.acquire();
+ md1md2c1.getWriteInterface().setEmptyBufferAcceptor(new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ }
+ });
+
+ md1md2c1.getReadInterface().setFullBufferAcceptor(new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ }
+
+ @Override
+ public void error(int ecode) {
+ }
+
+ @Override
+ public void close() {
+ sem.release();
+ }
+ });
+
+ ICloseableBufferAcceptor fba = md1md2c1.getWriteInterface().getFullBufferAcceptor();
+ for (int i = 0; i < 10000; ++i) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ for (int j = 0; j < 256; ++j) {
+ buffer.putInt(factor * (i + j));
+ }
+ buffer.flip();
+ fba.accept(buffer);
+ }
+ fba.close();
+ sem.acquire();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ }
+
+ private MuxDemux createMuxDemux(final String label) {
+ IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
+ @Override
+ public void channelOpened(final ChannelControlBlock channel) {
+ final ChannelIO cio = new ChannelIO(label, channel);
+ System.err.println(label + ": Channel Opened");
+ channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
+ channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
+
+ final IBufferAcceptor rieba = channel.getReadInterface().getEmptyBufferAcceptor();
+ for (int i = 0; i < 50; ++i) {
+ rieba.accept(ByteBuffer.allocate(1024));
+ }
+ new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ ByteBuffer fbuf = null;
+ synchronized (channel) {
+ while (!cio.eos && cio.ecode == 0 && cio.rifq.isEmpty()) {
+ try {
+ channel.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (!cio.rifq.isEmpty()) {
+ fbuf = cio.rifq.poll();
+ } else if (cio.ecode != 0) {
+ throw new RuntimeException("Error: " + cio.ecode);
+ } else if (cio.eos) {
+ channel.getWriteInterface().getFullBufferAcceptor().close();
+ System.err.println("Channel Closed");
+ return;
+ }
+ }
+ int counter = 0;
+ while (fbuf.remaining() > 0) {
+ counter += fbuf.getInt();
+ }
+ fbuf.compact();
+ rieba.accept(fbuf);
+ System.err.println("Received: total: " + counter);
+ }
+ }
+ }.start();
+ }
+ };
+ return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener);
+ }
+
+ private class ChannelIO {
+ private ChannelControlBlock channel;
+
+ private Queue<ByteBuffer> rifq;
+
+ private Queue<ByteBuffer> wieq;
+
+ private boolean eos;
+
+ private int ecode;
+
+ private ICloseableBufferAcceptor rifba;
+
+ private IBufferAcceptor wieba;
+
+ public ChannelIO(final String label, ChannelControlBlock channel) {
+ this.channel = channel;
+ this.rifq = new LinkedList<ByteBuffer>();
+ this.wieq = new LinkedList<ByteBuffer>();
+
+ rifba = new ICloseableBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ rifq.add(buffer);
+ ChannelIO.this.channel.notifyAll();
+ }
+
+ @Override
+ public void error(int ecode) {
+ ChannelIO.this.ecode = ecode;
+ ChannelIO.this.channel.notifyAll();
+ }
+
+ @Override
+ public void close() {
+ eos = true;
+ ChannelIO.this.channel.notifyAll();
+ }
+ };
+
+ wieba = new IBufferAcceptor() {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ wieq.add(buffer);
+ ChannelIO.this.channel.notifyAll();
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 890f42d..a68885b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
<module>hyracks-dataflow-hadoop</module>
<module>hyracks-control-common</module>
<module>hyracks-control-cc</module>
+ <module>hyracks-net</module>
<module>hyracks-control-nc</module>
<module>hyracks-data</module>
<module>hyracks-cli</module>