Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/pom.xml b/hyracks-ipc/pom.xml
new file mode 100644
index 0000000..49c4323
--- /dev/null
+++ b/hyracks-ipc/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-ipc</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
new file mode 100644
index 0000000..a4adf23
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCHandle {
+ public InetSocketAddress getRemoteAddress();
+
+ public void send(Object request, IResponseCallback callback) throws IPCException;
+
+ public void setAttachment(Object attachment);
+
+ public Object getAttachment();
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
new file mode 100644
index 0000000..ba9f343
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IIPCI {
+ public Object call(IIPCHandle caller, Object req) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
new file mode 100644
index 0000000..7d25f88
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IResponseCallback {
+ public void callback(IIPCHandle handle, Object response, Exception exception);
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
new file mode 100644
index 0000000..180b1dd
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public final class SyncRMI implements IResponseCallback {
+ private boolean pending;
+
+ private Object response;
+
+ private Exception exception;
+
+ public SyncRMI() {
+ }
+
+ @Override
+ public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
+ pending = false;
+ this.response = response;
+ this.exception = exception;
+ notifyAll();
+ }
+
+ public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
+ pending = true;
+ response = null;
+ exception = null;
+ handle.send(request, this);
+ while (pending) {
+ wait();
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
new file mode 100644
index 0000000..9ecf015
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.exceptions;
+
+public class IPCException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public IPCException() {
+ super();
+ }
+
+ public IPCException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IPCException(String message) {
+ super(message);
+ }
+
+ public IPCException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
new file mode 100644
index 0000000..47c3d4a
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+enum HandleState {
+ INITIAL,
+ CONNECT_SENT,
+ CONNECT_RECEIVED,
+ CONNECTED,
+ CLOSED,
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
new file mode 100644
index 0000000..cbbe718
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class IPCConnectionManager {
+ private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+
+ private final IPCSystem system;
+
+ private final NetworkThread networkThread;
+
+ private final ServerSocketChannel serverSocketChannel;
+
+ private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
+
+ private final List<IPCHandle>[] pendingConnections;
+
+ private final List<Message>[] sendList;
+
+ private int writerIndex;
+
+ private int readerIndex;
+
+ private final InetSocketAddress address;
+
+ private volatile boolean stopped;
+
+ IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+ this.system = system;
+ this.networkThread = new NetworkThread();
+ this.serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ ServerSocket socket = serverSocketChannel.socket();
+ socket.bind(socketAddress);
+ address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+ ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
+ pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
+ sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
+ writerIndex = 0;
+ readerIndex = 1;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ void start() {
+ stopped = false;
+ networkThread.start();
+ }
+
+ void stop() throws IOException {
+ stopped = true;
+ serverSocketChannel.close();
+ }
+
+ IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+ IPCHandle handle;
+ synchronized (this) {
+ handle = ipcHandleMap.get(remoteAddress);
+ if (handle == null) {
+ handle = new IPCHandle(system, remoteAddress);
+ pendingConnections[writerIndex].add(handle);
+ networkThread.selector.wakeup();
+ }
+ }
+ handle.waitTillConnected();
+ return handle;
+ }
+
+ synchronized void registerHandle(IPCHandle handle) {
+ ipcHandleMap.put(handle.getRemoteAddress(), handle);
+ }
+
+ synchronized void write(Message msg) {
+ sendList[writerIndex].add(msg);
+ networkThread.selector.wakeup();
+ }
+
+ private synchronized void swapReadersAndWriters() {
+ int temp = readerIndex;
+ readerIndex = writerIndex;
+ writerIndex = temp;
+ }
+
+ private Message createInitialReqMessage(IPCHandle handle) {
+ Message msg = new Message(handle);
+ msg.setMessageId(system.createMessageId());
+ msg.setRequestMessageId(-1);
+ msg.setFlag(Message.INITIAL_REQ);
+ msg.setPayload(address);
+ return msg;
+ }
+
+ private Message createInitialAckMessage(IPCHandle handle, Message req) {
+ Message msg = new Message(handle);
+ msg.setMessageId(system.createMessageId());
+ msg.setRequestMessageId(req.getMessageId());
+ msg.setFlag(Message.INITIAL_ACK);
+ msg.setPayload(null);
+ return msg;
+ }
+
+ void ack(IPCHandle handle, Message req) {
+ write(createInitialAckMessage(handle, req));
+ }
+
+ private class NetworkThread extends Thread {
+ private final Selector selector;
+
+ public NetworkThread() {
+ super("IPC Network Listener Thread");
+ setDaemon(true);
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ throw new RuntimeException(e);
+ }
+ while (!stopped) {
+ try {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting Select");
+ }
+ int n = selector.select();
+ swapReadersAndWriters();
+ if (!pendingConnections[readerIndex].isEmpty()) {
+ for (IPCHandle handle : pendingConnections[readerIndex]) {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ SelectionKey cKey = null;
+ if (channel.connect(handle.getRemoteAddress())) {
+ cKey = channel.register(selector, SelectionKey.OP_READ);
+ handle.setState(HandleState.CONNECT_SENT);
+ write(createInitialReqMessage(handle));
+ } else {
+ cKey = channel.register(selector, SelectionKey.OP_CONNECT);
+ }
+ handle.setKey(cKey);
+ cKey.attach(handle);
+ }
+ pendingConnections[readerIndex].clear();
+ }
+ if (!sendList[readerIndex].isEmpty()) {
+ for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+ Message msg = i.next();
+ IPCHandle handle = msg.getIPCHandle();
+ if (handle.getState() == HandleState.CLOSED) {
+ i.remove();
+ } else if (!handle.full()) {
+ while (true) {
+ ByteBuffer buffer = handle.getOutBuffer();
+ buffer.compact();
+ boolean success = msg.write(buffer);
+ buffer.flip();
+ if (success) {
+ i.remove();
+ SelectionKey key = handle.getKey();
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else {
+ if (buffer.position() == 0) {
+ handle.resizeOutBuffer();
+ continue;
+ }
+ handle.markFull();
+ }
+ break;
+ }
+ }
+ }
+ }
+ if (n > 0) {
+ for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+ SelectionKey key = i.next();
+ i.remove();
+ SelectableChannel sc = key.channel();
+ if (key.isReadable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ IPCHandle handle = (IPCHandle) key.attachment();
+ ByteBuffer readBuffer = handle.getInBuffer();
+ int len = channel.read(readBuffer);
+ if (len < 0) {
+ key.cancel();
+ channel.close();
+ handle.close();
+ } else {
+ handle.processIncomingMessages();
+ if (!readBuffer.hasRemaining()) {
+ handle.resizeInBuffer();
+ }
+ }
+ } else if (key.isWritable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ IPCHandle handle = (IPCHandle) key.attachment();
+ ByteBuffer writeBuffer = handle.getOutBuffer();
+ int len = channel.write(writeBuffer);
+ if (len < 0) {
+ key.cancel();
+ channel.close();
+ handle.close();
+ } else if (!writeBuffer.hasRemaining()) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ }
+ handle.clearFull();
+ } else if (key.isAcceptable()) {
+ assert sc == serverSocketChannel;
+ SocketChannel channel = serverSocketChannel.accept();
+ channel.configureBlocking(false);
+ IPCHandle handle = new IPCHandle(system, null);
+ SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
+ handle.setKey(cKey);
+ cKey.attach(handle);
+ handle.setState(HandleState.CONNECT_RECEIVED);
+ } else if (key.isConnectable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ if (channel.finishConnect()) {
+ IPCHandle handle = (IPCHandle) key.attachment();
+ handle.setState(HandleState.CONNECT_SENT);
+ registerHandle(handle);
+ key.interestOps(SelectionKey.OP_READ);
+ write(createInitialReqMessage(handle));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
new file mode 100644
index 0000000..481a0b0
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+final class IPCHandle implements IIPCHandle {
+ private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+
+ private final IPCSystem system;
+
+ private InetSocketAddress remoteAddress;
+
+ private final Map<Long, IResponseCallback> pendingRequestMap;
+
+ private HandleState state;
+
+ private SelectionKey key;
+
+ private Object attachment;
+
+ private ByteBuffer inBuffer;
+
+ private ByteBuffer outBuffer;
+
+ private boolean full;
+
+ IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
+ this.system = system;
+ this.remoteAddress = remoteAddress;
+ pendingRequestMap = new HashMap<Long, IResponseCallback>();
+ inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ outBuffer.flip();
+ state = HandleState.INITIAL;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ void setRemoteAddress(InetSocketAddress remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+ if (state != HandleState.CONNECTED) {
+ throw new IPCException("Handle is not in Connected state");
+ }
+ Message msg = new Message(this);
+ long mid = system.createMessageId();
+ msg.setMessageId(mid);
+ msg.setRequestMessageId(-1);
+ msg.setPayload(req);
+ if (callback != null) {
+ pendingRequestMap.put(mid, callback);
+ }
+ system.getConnectionManager().write(msg);
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ SelectionKey getKey() {
+ return key;
+ }
+
+ void setKey(SelectionKey key) {
+ this.key = key;
+ }
+
+ public synchronized boolean isConnected() {
+ return state == HandleState.CONNECTED;
+ }
+
+ synchronized HandleState getState() {
+ return state;
+ }
+
+ synchronized void setState(HandleState state) {
+ this.state = state;
+ notifyAll();
+ }
+
+ synchronized void waitTillConnected() throws InterruptedException {
+ while (!isConnected()) {
+ wait();
+ }
+ }
+
+ ByteBuffer getInBuffer() {
+ return inBuffer;
+ }
+
+ ByteBuffer getOutBuffer() {
+ return outBuffer;
+ }
+
+ synchronized void close() {
+ setState(HandleState.CLOSED);
+ for (IResponseCallback cb : pendingRequestMap.values()) {
+ cb.callback(this, null, new IPCException("IPC Handle Closed"));
+ }
+ }
+
+ synchronized void processIncomingMessages() {
+ inBuffer.flip();
+ while (Message.hasMessage(inBuffer)) {
+ Message message = new Message(this);
+ try {
+ message.read(inBuffer);
+ } catch (Exception e) {
+ message.setFlag(Message.ERROR);
+ message.setPayload(e);
+ }
+
+ if (state == HandleState.CONNECT_RECEIVED) {
+ remoteAddress = (InetSocketAddress) message.getPayload();
+ system.getConnectionManager().registerHandle(this);
+ setState(HandleState.CONNECTED);
+ system.getConnectionManager().ack(this, message);
+ continue;
+ } else if (state == HandleState.CONNECT_SENT) {
+ if (message.getFlag() == Message.INITIAL_ACK) {
+ setState(HandleState.CONNECTED);
+ } else {
+ throw new IllegalStateException();
+ }
+ continue;
+ }
+ long requestMessageId = message.getRequestMessageId();
+ if (requestMessageId < 0) {
+ system.deliverIncomingMessage(message);
+ } else {
+ Long rid = Long.valueOf(requestMessageId);
+ IResponseCallback cb = pendingRequestMap.remove(rid);
+ if (cb != null) {
+ byte flag = message.getFlag();
+ Object payload = flag == Message.ERROR ? null : message.getPayload();
+ Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
+ cb.callback(this, payload, exception);
+ }
+ }
+ }
+ inBuffer.compact();
+ }
+
+ void resizeInBuffer() {
+ inBuffer.flip();
+ ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
+ readBuffer.put(inBuffer);
+ readBuffer.compact();
+ inBuffer = readBuffer;
+ }
+
+ void resizeOutBuffer() {
+ ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
+ writeBuffer.put(outBuffer);
+ writeBuffer.compact();
+ writeBuffer.flip();
+ outBuffer = writeBuffer;
+ }
+
+ void markFull() {
+ full = true;
+ }
+
+ void clearFull() {
+ full = false;
+ }
+
+ boolean full() {
+ return full;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
new file mode 100644
index 0000000..9eef8ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCSystem {
+ private final IPCConnectionManager cMgr;
+
+ private final IIPCI ipci;
+
+ private final Executor executor;
+
+ private final AtomicLong midFactory;
+
+ public IPCSystem(InetSocketAddress socketAddress) throws IOException {
+ this(socketAddress, null, null);
+ }
+
+ public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+ cMgr = new IPCConnectionManager(this, socketAddress);
+ this.ipci = ipci;
+ this.executor = executor;
+ midFactory = new AtomicLong();
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return cMgr.getAddress();
+ }
+
+ public void start() {
+ cMgr.start();
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+ try {
+ return cMgr.getIPCHandle(remoteAddress);
+ } catch (IOException e) {
+ throw new IPCException(e);
+ } catch (InterruptedException e) {
+ throw new IPCException(e);
+ }
+ }
+
+ long createMessageId() {
+ return midFactory.incrementAndGet();
+ }
+
+ void deliverIncomingMessage(final Message message) {
+ assert message.getFlag() == Message.NORMAL;
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ IPCHandle handle = message.getIPCHandle();
+ Message response = new Message(handle);
+ response.setMessageId(createMessageId());
+ response.setRequestMessageId(message.getMessageId());
+ response.setFlag(Message.NORMAL);
+ try {
+ Object result = ipci.call(handle, message.getPayload());
+ response.setPayload(result);
+ } catch (Exception e) {
+ response.setFlag(Message.ERROR);
+ response.setPayload(e);
+ }
+ cMgr.write(response);
+ }
+ });
+ }
+
+ IPCConnectionManager getConnectionManager() {
+ return cMgr;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
new file mode 100644
index 0000000..ab3428e
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+
+class Message {
+ private static final int MSG_SIZE_SIZE = 4;
+
+ private static final int HEADER_SIZE = 17;
+
+ static final byte INITIAL_REQ = 1;
+
+ static final byte INITIAL_ACK = 2;
+
+ static final byte ERROR = 3;
+
+ static final byte NORMAL = 0;
+
+ private IPCHandle ipcHandle;
+
+ private long messageId;
+
+ private long requestMessageId;
+
+ private byte flag;
+
+ private Object payload;
+
+ Message(IPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ IPCHandle getIPCHandle() {
+ return ipcHandle;
+ }
+
+ void setMessageId(long messageId) {
+ this.messageId = messageId;
+ }
+
+ long getMessageId() {
+ return messageId;
+ }
+
+ void setRequestMessageId(long requestMessageId) {
+ this.requestMessageId = requestMessageId;
+ }
+
+ long getRequestMessageId() {
+ return requestMessageId;
+ }
+
+ void setFlag(byte flag) {
+ this.flag = flag;
+ }
+
+ byte getFlag() {
+ return flag;
+ }
+
+ void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
+ Object getPayload() {
+ return payload;
+ }
+
+ static boolean hasMessage(ByteBuffer buffer) {
+ if (buffer.remaining() < MSG_SIZE_SIZE) {
+ return false;
+ }
+ int msgSize = buffer.getInt(buffer.position());
+ return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+ }
+
+ void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+ assert hasMessage(buffer);
+ int msgSize = buffer.getInt();
+ messageId = buffer.getLong();
+ requestMessageId = buffer.getLong();
+ flag = buffer.get();
+ int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+ msgSize - HEADER_SIZE));
+ payload = ois.readObject();
+ ois.close();
+ } finally {
+ buffer.position(finalPosition);
+ }
+ }
+
+ boolean write(ByteBuffer buffer) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(payload);
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+ if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
+ buffer.putInt(HEADER_SIZE + bytes.length);
+ buffer.putLong(messageId);
+ buffer.putLong(requestMessageId);
+ buffer.put(flag);
+ buffer.put(bytes);
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
new file mode 100644
index 0000000..1fc1f6f
--- /dev/null
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.ipc.tests;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+public class IPCTest {
+ @Test
+ public void test() throws Exception {
+ IPCSystem server = createServerIPCSystem();
+ server.start();
+ InetSocketAddress serverAddr = server.getSocketAddress();
+
+ IPCSystem client = createClientIPCSystem();
+ client.start();
+
+ IIPCHandle handle = client.getHandle(serverAddr);
+
+ SyncRMI rmi = new SyncRMI();
+ for (int i = 0; i < 100; ++i) {
+ Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+ }
+
+ IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
+
+ try {
+ rmi.call(rHandle, "Foo");
+ Assert.assertTrue(false);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
+
+ private IPCSystem createServerIPCSystem() throws IOException {
+ Executor executor = Executors.newCachedThreadPool();
+ IIPCI ipci = new IIPCI() {
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ Integer i = (Integer) req;
+ return i.intValue() * 2;
+ }
+ };
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ }
+
+ private IPCSystem createClientIPCSystem() throws IOException {
+ Executor executor = Executors.newCachedThreadPool();
+ IIPCI ipci = new IIPCI() {
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ throw new IllegalStateException();
+ }
+ };
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ }
+}
\ No newline at end of file