Added new networking layer.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@984 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index 64a409c..8c56958 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -1,9 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
@@ -37,6 +34,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d869515..d5fc27e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -65,7 +65,7 @@
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
@@ -91,7 +91,7 @@
private final PartitionManager partitionManager;
- private final ConnectionManager connectionManager;
+ private final NetworkManager netManager;
private final WorkQueue queue;
@@ -131,9 +131,8 @@
if (id == null) {
throw new Exception("id not set");
}
- connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
partitionManager = new PartitionManager(this);
- connectionManager.setPartitionRequestListener(partitionManager);
+ netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -166,7 +165,7 @@
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
- connectionManager.start();
+ netManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -174,9 +173,9 @@
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
- connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
- osMXBean.getAvailableProcessors(), hbSchema));
+ this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
+ .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
+ .getAvailableProcessors(), hbSchema));
queue.start();
heartbeatTask = new HeartbeatTask(ccs);
@@ -197,7 +196,7 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
partitionManager.close();
heartbeatTask.cancel();
- connectionManager.stop();
+ netManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -218,8 +217,8 @@
return jobletMap;
}
- public ConnectionManager getConnectionManager() {
- return connectionManager;
+ public NetworkManager getNetworkManager() {
+ return netManager;
}
public PartitionManager getPartitionManager() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
deleted file mode 100644
index 6e38ef7..0000000
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
-
-public class ConnectionManager {
- private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
-
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IHyracksRootContext ctx;
-
- private IPartitionRequestListener partitionRequestListener;
-
- private final ServerSocketChannel serverChannel;
-
- private volatile boolean stopped;
-
- private final ConnectionListenerThread connectionListener;
-
- private final DataListenerThread dataListener;
-
- private final NetworkAddress networkAddress;
-
- public ConnectionManager(IHyracksRootContext ctx, InetAddress inetAddress) throws IOException {
- this.ctx = ctx;
- serverChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverChannel.socket();
- serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
- serverSocket.setReuseAddress(true);
- stopped = false;
- connectionListener = new ConnectionListenerThread();
- dataListener = new DataListenerThread();
- networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
-
- }
-
- public void setPartitionRequestListener(IPartitionRequestListener partitionRequestListener) {
- this.partitionRequestListener = partitionRequestListener;
- }
-
- public void start() {
- connectionListener.start();
- dataListener.start();
- }
-
- public void stop() {
- try {
- stopped = true;
- serverChannel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void connect(INetworkChannel channel) throws IOException {
- dataListener.addOutgoingConnection(channel);
- }
-
- private final class ConnectionListenerThread extends Thread {
- public ConnectionListenerThread() {
- super("Hyracks NC Connection Listener");
- setDaemon(true);
- setPriority(MAX_PRIORITY);
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- SocketChannel sc = serverChannel.accept();
- dataListener.addIncomingConnection(sc);
- } catch (AsynchronousCloseException e) {
- // do nothing
- if (!stopped) {
- e.printStackTrace();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private final class DataListenerThread extends Thread {
- private Selector selector;
-
- private final List<SocketChannel> pendingIncomingConnections;
- private final Set<SocketChannel> pendingNegotiations;
- private final List<INetworkChannel> pendingOutgoingConnections;
-
- public DataListenerThread() {
- super("Hyracks Data Listener Thread");
- setDaemon(true);
- try {
- selector = Selector.open();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- pendingIncomingConnections = new ArrayList<SocketChannel>();
- pendingNegotiations = new HashSet<SocketChannel>();
- pendingOutgoingConnections = new ArrayList<INetworkChannel>();
- }
-
- synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
- pendingIncomingConnections.add(sc);
- selector.wakeup();
- }
-
- synchronized void addOutgoingConnection(INetworkChannel channel) throws IOException {
- pendingOutgoingConnections.add(channel);
- selector.wakeup();
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting Select");
- }
- int n = selector.select();
- synchronized (this) {
- if (!pendingIncomingConnections.isEmpty()) {
- for (SocketChannel sc : pendingIncomingConnections) {
- sc.configureBlocking(false);
- sc.socket().setReuseAddress(true);
- SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
- ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
- scKey.attach(buffer);
- pendingNegotiations.add(sc);
- }
- pendingIncomingConnections.clear();
- }
- if (!pendingOutgoingConnections.isEmpty()) {
- for (INetworkChannel nc : pendingOutgoingConnections) {
- SocketChannel sc = SocketChannel.open();
- sc.configureBlocking(false);
- sc.socket().setReuseAddress(true);
- SelectionKey scKey = sc.register(selector, 0);
- scKey.attach(nc);
- nc.setSelectionKey(scKey);
- nc.notifyConnectionManagerRegistration();
- }
- pendingOutgoingConnections.clear();
- }
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Selector: " + n);
- }
- if (n > 0) {
- for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
- SelectionKey key = i.next();
- i.remove();
- SocketChannel sc = (SocketChannel) key.channel();
- if (pendingNegotiations.contains(sc)) {
- if (key.isReadable()) {
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- sc.read(buffer);
- buffer.flip();
- if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
- PartitionId pid = readInitialMessage(buffer);
- pendingNegotiations.remove(sc);
- key.interestOps(0);
- NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
- channel.setSelectionKey(key);
- key.attach(channel);
- try {
- partitionRequestListener.registerPartitionRequest(pid, channel);
- } catch (HyracksException e) {
- key.cancel();
- sc.close();
- channel.abort();
- }
- } else {
- buffer.compact();
- }
- }
- } else {
- INetworkChannel channel = (INetworkChannel) key.attachment();
- boolean close = false;
- boolean error = false;
- try {
- close = channel.dispatchNetworkEvent();
- } catch (IOException e) {
- e.printStackTrace();
- error = true;
- }
- if (close || error) {
- key.cancel();
- sc.close();
- if (error) {
- channel.abort();
- }
- }
- }
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- private PartitionId readInitialMessage(ByteBuffer buffer) {
- JobId jobId = new JobId(buffer.getLong());
- ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
- int senderIndex = buffer.getInt();
- int receiverIndex = buffer.getInt();
- return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
- }
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 61cd91f..1f4f070 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
-import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
public interface INetworkChannel {
@@ -9,10 +8,6 @@
public void setSelectionKey(SelectionKey key);
- public SelectionKey getSelectionKey();
-
- public SocketAddress getRemoteAddress();
-
public void abort();
public void notifyConnectionManagerRegistration() throws IOException;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 23cf514..6400288 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -14,65 +14,48 @@
*/
package edu.uci.ics.hyracks.control.nc.net;
-import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-public class NetworkInputChannel implements IInputChannel, INetworkChannel {
- private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+public class NetworkInputChannel implements IInputChannel {
+ private IHyracksRootContext ctx;
- private final ConnectionManager connectionManager;
+ private final NetworkManager netManager;
private final SocketAddress remoteAddress;
private final PartitionId partitionId;
- private final Queue<ByteBuffer> emptyQueue;
-
private final Queue<ByteBuffer> fullQueue;
- private SocketChannel socketChannel;
+ private final int nBuffers;
- private SelectionKey key;
-
- private ByteBuffer currentBuffer;
-
- private boolean eos;
-
- private boolean aborted;
+ private ChannelControlBlock ccb;
private IInputChannelMonitor monitor;
private Object attachment;
- private ByteBuffer writeBuffer;
-
- public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
- SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
- this.connectionManager = connectionManager;
+ public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
+ PartitionId partitionId, int nBuffers) {
+ this.ctx = ctx;
+ this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
- this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- for (int i = 0; i < nBuffers; ++i) {
- emptyQueue.add(ctx.allocateFrame());
- }
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- aborted = false;
- eos = false;
+ this.nBuffers = nBuffers;
}
@Override
@@ -96,29 +79,31 @@
}
@Override
- public synchronized void recycleBuffer(ByteBuffer buffer) {
+ public void recycleBuffer(ByteBuffer buffer) {
buffer.clear();
- emptyQueue.add(buffer);
- if (!eos && !aborted) {
- int ops = key.interestOps();
- if ((ops & SelectionKey.OP_READ) == 0) {
- key.interestOps(ops | SelectionKey.OP_READ);
- key.selector().wakeup();
- if (currentBuffer == null) {
- currentBuffer = emptyQueue.poll();
- }
- }
- }
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
}
@Override
public void open() throws HyracksDataException {
- currentBuffer = emptyQueue.poll();
try {
- connectionManager.connect(this);
- } catch (IOException e) {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
}
@Override
@@ -126,110 +111,28 @@
}
- @Override
- public synchronized boolean dispatchNetworkEvent() throws IOException {
- if (aborted) {
- eos = true;
- monitor.notifyFailure(this);
- return true;
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
}
- if (key.isConnectable()) {
- if (socketChannel.finishConnect()) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
- prepareForWrite();
- }
- } else if (key.isWritable()) {
- socketChannel.write(writeBuffer);
- if (writeBuffer.remaining() == 0) {
- key.interestOps(SelectionKey.OP_READ);
- }
- } else if (key.isReadable()) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
- }
- int bytesRead = socketChannel.read(currentBuffer);
- if (bytesRead < 0) {
- eos = true;
- monitor.notifyEndOfStream(this);
- return true;
- }
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After read: " + currentBuffer.position() + " " + currentBuffer.limit());
- }
- currentBuffer.flip();
- int dataLen = currentBuffer.remaining();
- if (dataLen >= currentBuffer.capacity() || aborted()) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
- }
- if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
- eos = true;
- monitor.notifyEndOfStream(this);
- return true;
- }
- fullQueue.add(currentBuffer);
- currentBuffer = emptyQueue.poll();
- if (currentBuffer == null && key.isValid()) {
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_READ);
- }
- monitor.notifyDataAvailability(this, 1);
- return false;
- }
- currentBuffer.compact();
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
}
- return false;
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
}
- private void prepareForConnect() {
- key.interestOps(SelectionKey.OP_CONNECT);
- }
-
- private void prepareForWrite() {
- writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
- writeBuffer.putLong(partitionId.getJobId().getId());
- writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
- writeBuffer.putInt(partitionId.getSenderIndex());
- writeBuffer.putInt(partitionId.getReceiverIndex());
- writeBuffer.flip();
-
- key.interestOps(SelectionKey.OP_WRITE);
- }
-
- @Override
- public void setSelectionKey(SelectionKey key) {
- this.key = key;
- socketChannel = (SocketChannel) key.channel();
- }
-
- @Override
- public SocketAddress getRemoteAddress() {
- return remoteAddress;
- }
-
- @Override
- public SelectionKey getSelectionKey() {
- return key;
- }
-
- public PartitionId getPartitionId() {
- return partitionId;
- }
-
- public void abort() {
- aborted = true;
- }
-
- public boolean aborted() {
- return aborted;
- }
-
- @Override
- public void notifyConnectionManagerRegistration() throws IOException {
- if (socketChannel.connect(remoteAddress)) {
- prepareForWrite();
- } else {
- prepareForConnect();
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..c47db54
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+
+public class NetworkManager {
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPartitionRequestListener partitionRequestListener;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
+ IPartitionRequestListener partitionRequestListener) throws IOException {
+ this.ctx = ctx;
+ this.partitionRequestListener = partitionRequestListener;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ PartitionId pid = readInitialMessage(buffer);
+ noc = new NetworkOutputChannel(ctx, ccb, 5);
+ try {
+ partitionRequestListener.registerPartitionRequest(pid, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ private static PartitionId readInitialMessage(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+ int senderIndex = buffer.getInt();
+ int receiverIndex = buffer.getInt();
+ return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 31ce924..1abcdf6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -14,124 +14,44 @@
*/
package edu.uci.ics.hyracks.control.nc.net;
-import java.io.IOException;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-public class NetworkOutputChannel implements INetworkChannel, IFrameWriter {
- private final IHyracksRootContext ctx;
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
private final Queue<ByteBuffer> emptyQueue;
- private final Queue<ByteBuffer> fullQueue;
-
- private SelectionKey key;
-
private boolean aborted;
- private boolean eos;
-
- private boolean eosSent;
-
- private boolean failed;
-
- private ByteBuffer currentBuffer;
-
- public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
- this.ctx = ctx;
+ public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
- fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- }
-
- @Override
- public synchronized boolean dispatchNetworkEvent() throws IOException {
- if (failed || aborted) {
- eos = true;
- return true;
- } else if (key.isWritable()) {
- while (true) {
- if (currentBuffer == null) {
- if (eosSent) {
- return true;
- }
- currentBuffer = fullQueue.poll();
- if (currentBuffer == null) {
- if (eos) {
- currentBuffer = emptyQueue.poll();
- currentBuffer.clear();
- currentBuffer.putInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()), 0);
- eosSent = true;
- } else {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- return false;
- }
- }
- }
- int bytesWritten = ((SocketChannel) key.channel()).write(currentBuffer);
- if (bytesWritten < 0) {
- eos = true;
- return true;
- }
- if (currentBuffer.remaining() == 0) {
- emptyQueue.add(currentBuffer);
- notifyAll();
- currentBuffer = null;
- if (eosSent) {
- return true;
- }
- } else {
- return false;
- }
- }
- }
- return false;
- }
-
- @Override
- public void setSelectionKey(SelectionKey key) {
- this.key = key;
- }
-
- @Override
- public SelectionKey getSelectionKey() {
- return key;
- }
-
- @Override
- public SocketAddress getRemoteAddress() {
- return ((SocketChannel) key.channel()).socket().getRemoteSocketAddress();
- }
-
- @Override
- public synchronized void abort() {
- aborted = true;
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@Override
public void open() throws HyracksDataException {
- currentBuffer = null;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer destBuffer = null;
synchronized (this) {
- if (aborted) {
- throw new HyracksDataException("Connection has been aborted");
- }
while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
destBuffer = emptyQueue.poll();
if (destBuffer != null) {
break;
@@ -148,26 +68,34 @@
destBuffer.clear();
destBuffer.put(buffer);
destBuffer.flip();
- synchronized (this) {
- fullQueue.add(destBuffer);
- }
- key.interestOps(SelectionKey.OP_WRITE);
- key.selector().wakeup();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
}
@Override
public void fail() throws HyracksDataException {
- failed = true;
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
}
@Override
- public synchronized void close() throws HyracksDataException {
- eos = true;
- key.interestOps(SelectionKey.OP_WRITE);
- key.selector().wakeup();
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
}
- @Override
- public void notifyConnectionManagerRegistration() throws IOException {
+ void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyQueue.add(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bfa21c9..bd40fea 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -48,7 +48,7 @@
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+ ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
networkAddress.getPort()), pid, 1));
ji.reportPartitionAvailability(channel);
}