Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/pom.xml b/hyracks-ipc/pom.xml
new file mode 100644
index 0000000..49c4323
--- /dev/null
+++ b/hyracks-ipc/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-ipc</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>junit</groupId>
+  	<artifactId>junit</artifactId>
+  	<version>4.8.1</version>
+  	<scope>test</scope>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
new file mode 100644
index 0000000..a4adf23
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCHandle {
+    public InetSocketAddress getRemoteAddress();
+
+    public void send(Object request, IResponseCallback callback) throws IPCException;
+
+    public void setAttachment(Object attachment);
+
+    public Object getAttachment();
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
new file mode 100644
index 0000000..ba9f343
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IIPCI {
+    public Object call(IIPCHandle caller, Object req) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
new file mode 100644
index 0000000..7d25f88
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IResponseCallback {
+    public void callback(IIPCHandle handle, Object response, Exception exception);
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
new file mode 100644
index 0000000..180b1dd
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public final class SyncRMI implements IResponseCallback {
+    private boolean pending;
+
+    private Object response;
+
+    private Exception exception;
+
+    public SyncRMI() {
+    }
+
+    @Override
+    public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
+        pending = false;
+        this.response = response;
+        this.exception = exception;
+        notifyAll();
+    }
+
+    public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
+        pending = true;
+        response = null;
+        exception = null;
+        handle.send(request, this);
+        while (pending) {
+            wait();
+        }
+        if (exception != null) {
+            throw exception;
+        }
+        return response;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
new file mode 100644
index 0000000..9ecf015
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.exceptions;
+
+public class IPCException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public IPCException() {
+        super();
+    }
+
+    public IPCException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IPCException(String message) {
+        super(message);
+    }
+
+    public IPCException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
new file mode 100644
index 0000000..47c3d4a
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+enum HandleState {
+    INITIAL,
+    CONNECT_SENT,
+    CONNECT_RECEIVED,
+    CONNECTED,
+    CLOSED,
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
new file mode 100644
index 0000000..cbbe718
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class IPCConnectionManager {
+    private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+
+    private final IPCSystem system;
+
+    private final NetworkThread networkThread;
+
+    private final ServerSocketChannel serverSocketChannel;
+
+    private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
+
+    private final List<IPCHandle>[] pendingConnections;
+
+    private final List<Message>[] sendList;
+
+    private int writerIndex;
+
+    private int readerIndex;
+
+    private final InetSocketAddress address;
+
+    private volatile boolean stopped;
+
+    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+        this.system = system;
+        this.networkThread = new NetworkThread();
+        this.serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket socket = serverSocketChannel.socket();
+        socket.bind(socketAddress);
+        address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+        ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
+        pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
+        sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
+        writerIndex = 0;
+        readerIndex = 1;
+    }
+
+    InetSocketAddress getAddress() {
+        return address;
+    }
+
+    void start() {
+        stopped = false;
+        networkThread.start();
+    }
+
+    void stop() throws IOException {
+        stopped = true;
+        serverSocketChannel.close();
+    }
+
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+        IPCHandle handle;
+        synchronized (this) {
+            handle = ipcHandleMap.get(remoteAddress);
+            if (handle == null) {
+                handle = new IPCHandle(system, remoteAddress);
+                pendingConnections[writerIndex].add(handle);
+                networkThread.selector.wakeup();
+            }
+        }
+        handle.waitTillConnected();
+        return handle;
+    }
+
+    synchronized void registerHandle(IPCHandle handle) {
+        ipcHandleMap.put(handle.getRemoteAddress(), handle);
+    }
+
+    synchronized void write(Message msg) {
+        sendList[writerIndex].add(msg);
+        networkThread.selector.wakeup();
+    }
+
+    private synchronized void swapReadersAndWriters() {
+        int temp = readerIndex;
+        readerIndex = writerIndex;
+        writerIndex = temp;
+    }
+
+    private Message createInitialReqMessage(IPCHandle handle) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(-1);
+        msg.setFlag(Message.INITIAL_REQ);
+        msg.setPayload(address);
+        return msg;
+    }
+
+    private Message createInitialAckMessage(IPCHandle handle, Message req) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(req.getMessageId());
+        msg.setFlag(Message.INITIAL_ACK);
+        msg.setPayload(null);
+        return msg;
+    }
+
+    void ack(IPCHandle handle, Message req) {
+        write(createInitialAckMessage(handle, req));
+    }
+
+    private class NetworkThread extends Thread {
+        private final Selector selector;
+
+        public NetworkThread() {
+            super("IPC Network Listener Thread");
+            setDaemon(true);
+            try {
+                selector = Selector.open();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+            } catch (ClosedChannelException e) {
+                throw new RuntimeException(e);
+            }
+            while (!stopped) {
+                try {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting Select");
+                    }
+                    int n = selector.select();
+                    swapReadersAndWriters();
+                    if (!pendingConnections[readerIndex].isEmpty()) {
+                        for (IPCHandle handle : pendingConnections[readerIndex]) {
+                            SocketChannel channel = SocketChannel.open();
+                            channel.configureBlocking(false);
+                            SelectionKey cKey = null;
+                            if (channel.connect(handle.getRemoteAddress())) {
+                                cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setState(HandleState.CONNECT_SENT);
+                                write(createInitialReqMessage(handle));
+                            } else {
+                                cKey = channel.register(selector, SelectionKey.OP_CONNECT);
+                            }
+                            handle.setKey(cKey);
+                            cKey.attach(handle);
+                        }
+                        pendingConnections[readerIndex].clear();
+                    }
+                    if (!sendList[readerIndex].isEmpty()) {
+                        for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+                            Message msg = i.next();
+                            IPCHandle handle = msg.getIPCHandle();
+                            if (handle.getState() == HandleState.CLOSED) {
+                                i.remove();
+                            } else if (!handle.full()) {
+                                while (true) {
+                                    ByteBuffer buffer = handle.getOutBuffer();
+                                    buffer.compact();
+                                    boolean success = msg.write(buffer);
+                                    buffer.flip();
+                                    if (success) {
+                                        i.remove();
+                                        SelectionKey key = handle.getKey();
+                                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                    } else {
+                                        if (buffer.position() == 0) {
+                                            handle.resizeOutBuffer();
+                                            continue;
+                                        }
+                                        handle.markFull();
+                                    }
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    if (n > 0) {
+                        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+                            SelectionKey key = i.next();
+                            i.remove();
+                            SelectableChannel sc = key.channel();
+                            if (key.isReadable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer readBuffer = handle.getInBuffer();
+                                int len = channel.read(readBuffer);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else {
+                                    handle.processIncomingMessages();
+                                    if (!readBuffer.hasRemaining()) {
+                                        handle.resizeInBuffer();
+                                    }
+                                }
+                            } else if (key.isWritable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer writeBuffer = handle.getOutBuffer();
+                                int len = channel.write(writeBuffer);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else if (!writeBuffer.hasRemaining()) {
+                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                                }
+                                handle.clearFull();
+                            } else if (key.isAcceptable()) {
+                                assert sc == serverSocketChannel;
+                                SocketChannel channel = serverSocketChannel.accept();
+                                channel.configureBlocking(false);
+                                IPCHandle handle = new IPCHandle(system, null);
+                                SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setKey(cKey);
+                                cKey.attach(handle);
+                                handle.setState(HandleState.CONNECT_RECEIVED);
+                            } else if (key.isConnectable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                if (channel.finishConnect()) {
+                                    IPCHandle handle = (IPCHandle) key.attachment();
+                                    handle.setState(HandleState.CONNECT_SENT);
+                                    registerHandle(handle);
+                                    key.interestOps(SelectionKey.OP_READ);
+                                    write(createInitialReqMessage(handle));
+                                }
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
new file mode 100644
index 0000000..481a0b0
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+final class IPCHandle implements IIPCHandle {
+    private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+
+    private final IPCSystem system;
+
+    private InetSocketAddress remoteAddress;
+
+    private final Map<Long, IResponseCallback> pendingRequestMap;
+
+    private HandleState state;
+
+    private SelectionKey key;
+
+    private Object attachment;
+
+    private ByteBuffer inBuffer;
+
+    private ByteBuffer outBuffer;
+
+    private boolean full;
+
+    IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
+        this.system = system;
+        this.remoteAddress = remoteAddress;
+        pendingRequestMap = new HashMap<Long, IResponseCallback>();
+        inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer.flip();
+        state = HandleState.INITIAL;
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    void setRemoteAddress(InetSocketAddress remoteAddress) {
+        this.remoteAddress = remoteAddress;
+    }
+
+    @Override
+    public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+        if (state != HandleState.CONNECTED) {
+            throw new IPCException("Handle is not in Connected state");
+        }
+        Message msg = new Message(this);
+        long mid = system.createMessageId();
+        msg.setMessageId(mid);
+        msg.setRequestMessageId(-1);
+        msg.setPayload(req);
+        if (callback != null) {
+            pendingRequestMap.put(mid, callback);
+        }
+        system.getConnectionManager().write(msg);
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    SelectionKey getKey() {
+        return key;
+    }
+
+    void setKey(SelectionKey key) {
+        this.key = key;
+    }
+
+    public synchronized boolean isConnected() {
+        return state == HandleState.CONNECTED;
+    }
+
+    synchronized HandleState getState() {
+        return state;
+    }
+
+    synchronized void setState(HandleState state) {
+        this.state = state;
+        notifyAll();
+    }
+    
+    synchronized void waitTillConnected() throws InterruptedException {
+        while (!isConnected()) {
+            wait();
+        }
+    }
+
+    ByteBuffer getInBuffer() {
+        return inBuffer;
+    }
+
+    ByteBuffer getOutBuffer() {
+        return outBuffer;
+    }
+
+    synchronized void close() {
+        setState(HandleState.CLOSED);
+        for (IResponseCallback cb : pendingRequestMap.values()) {
+            cb.callback(this, null, new IPCException("IPC Handle Closed"));
+        }
+    }
+
+    synchronized void processIncomingMessages() {
+        inBuffer.flip();
+        while (Message.hasMessage(inBuffer)) {
+            Message message = new Message(this);
+            try {
+                message.read(inBuffer);
+            } catch (Exception e) {
+                message.setFlag(Message.ERROR);
+                message.setPayload(e);
+            }
+
+            if (state == HandleState.CONNECT_RECEIVED) {
+                remoteAddress = (InetSocketAddress) message.getPayload();
+                system.getConnectionManager().registerHandle(this);
+                setState(HandleState.CONNECTED);
+                system.getConnectionManager().ack(this, message);
+                continue;
+            } else if (state == HandleState.CONNECT_SENT) {
+                if (message.getFlag() == Message.INITIAL_ACK) {
+                    setState(HandleState.CONNECTED);
+                } else {
+                    throw new IllegalStateException();
+                }
+                continue;
+            }
+            long requestMessageId = message.getRequestMessageId();
+            if (requestMessageId < 0) {
+                system.deliverIncomingMessage(message);
+            } else {
+                Long rid = Long.valueOf(requestMessageId);
+                IResponseCallback cb = pendingRequestMap.remove(rid);
+                if (cb != null) {
+                    byte flag = message.getFlag();
+                    Object payload = flag == Message.ERROR ? null : message.getPayload();
+                    Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
+                    cb.callback(this, payload, exception);
+                }
+            }
+        }
+        inBuffer.compact();
+    }
+
+    void resizeInBuffer() {
+        inBuffer.flip();
+        ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
+        readBuffer.put(inBuffer);
+        readBuffer.compact();
+        inBuffer = readBuffer;
+    }
+
+    void resizeOutBuffer() {
+        ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
+        writeBuffer.put(outBuffer);
+        writeBuffer.compact();
+        writeBuffer.flip();
+        outBuffer = writeBuffer;
+    }
+
+    void markFull() {
+        full = true;
+    }
+
+    void clearFull() {
+        full = false;
+    }
+
+    boolean full() {
+        return full;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
new file mode 100644
index 0000000..9eef8ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCSystem {
+    private final IPCConnectionManager cMgr;
+
+    private final IIPCI ipci;
+
+    private final Executor executor;
+
+    private final AtomicLong midFactory;
+
+    public IPCSystem(InetSocketAddress socketAddress) throws IOException {
+        this(socketAddress, null, null);
+    }
+
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+        cMgr = new IPCConnectionManager(this, socketAddress);
+        this.ipci = ipci;
+        this.executor = executor;
+        midFactory = new AtomicLong();
+    }
+
+    public InetSocketAddress getSocketAddress() {
+        return cMgr.getAddress();
+    }
+
+    public void start() {
+        cMgr.start();
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        try {
+            return cMgr.getIPCHandle(remoteAddress);
+        } catch (IOException e) {
+            throw new IPCException(e);
+        } catch (InterruptedException e) {
+            throw new IPCException(e);
+        }
+    }
+
+    long createMessageId() {
+        return midFactory.incrementAndGet();
+    }
+
+    void deliverIncomingMessage(final Message message) {
+        assert message.getFlag() == Message.NORMAL;
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                IPCHandle handle = message.getIPCHandle();
+                Message response = new Message(handle);
+                response.setMessageId(createMessageId());
+                response.setRequestMessageId(message.getMessageId());
+                response.setFlag(Message.NORMAL);
+                try {
+                    Object result = ipci.call(handle, message.getPayload());
+                    response.setPayload(result);
+                } catch (Exception e) {
+                    response.setFlag(Message.ERROR);
+                    response.setPayload(e);
+                }
+                cMgr.write(response);
+            }
+        });
+    }
+
+    IPCConnectionManager getConnectionManager() {
+        return cMgr;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
new file mode 100644
index 0000000..ab3428e
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+
+class Message {
+    private static final int MSG_SIZE_SIZE = 4;
+
+    private static final int HEADER_SIZE = 17;
+
+    static final byte INITIAL_REQ = 1;
+
+    static final byte INITIAL_ACK = 2;
+
+    static final byte ERROR = 3;
+
+    static final byte NORMAL = 0;
+
+    private IPCHandle ipcHandle;
+
+    private long messageId;
+
+    private long requestMessageId;
+
+    private byte flag;
+
+    private Object payload;
+
+    Message(IPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    IPCHandle getIPCHandle() {
+        return ipcHandle;
+    }
+
+    void setMessageId(long messageId) {
+        this.messageId = messageId;
+    }
+
+    long getMessageId() {
+        return messageId;
+    }
+
+    void setRequestMessageId(long requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    long getRequestMessageId() {
+        return requestMessageId;
+    }
+
+    void setFlag(byte flag) {
+        this.flag = flag;
+    }
+
+    byte getFlag() {
+        return flag;
+    }
+
+    void setPayload(Object payload) {
+        this.payload = payload;
+    }
+
+    Object getPayload() {
+        return payload;
+    }
+
+    static boolean hasMessage(ByteBuffer buffer) {
+        if (buffer.remaining() < MSG_SIZE_SIZE) {
+            return false;
+        }
+        int msgSize = buffer.getInt(buffer.position());
+        return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+    }
+
+    void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+        assert hasMessage(buffer);
+        int msgSize = buffer.getInt();
+        messageId = buffer.getLong();
+        requestMessageId = buffer.getLong();
+        flag = buffer.get();
+        int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+        try {
+            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+                    msgSize - HEADER_SIZE));
+            payload = ois.readObject();
+            ois.close();
+        } finally {
+            buffer.position(finalPosition);
+        }
+    }
+
+    boolean write(ByteBuffer buffer) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(payload);
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+        if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
+            buffer.putInt(HEADER_SIZE + bytes.length);
+            buffer.putLong(messageId);
+            buffer.putLong(requestMessageId);
+            buffer.put(flag);
+            buffer.put(bytes);
+            return true;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
new file mode 100644
index 0000000..1fc1f6f
--- /dev/null
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.ipc.tests;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+public class IPCTest {
+    @Test
+    public void test() throws Exception {
+        IPCSystem server = createServerIPCSystem();
+        server.start();
+        InetSocketAddress serverAddr = server.getSocketAddress();
+
+        IPCSystem client = createClientIPCSystem();
+        client.start();
+
+        IIPCHandle handle = client.getHandle(serverAddr);
+
+        SyncRMI rmi = new SyncRMI();
+        for (int i = 0; i < 100; ++i) {
+            Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+        }
+
+        IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
+
+        try {
+            rmi.call(rHandle, "Foo");
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    private IPCSystem createServerIPCSystem() throws IOException {
+        Executor executor = Executors.newCachedThreadPool();
+        IIPCI ipci = new IIPCI() {
+            @Override
+            public Object call(IIPCHandle caller, Object req) throws Exception {
+                Integer i = (Integer) req;
+                return i.intValue() * 2;
+            }
+        };
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+    }
+
+    private IPCSystem createClientIPCSystem() throws IOException {
+        Executor executor = Executors.newCachedThreadPool();
+        IIPCI ipci = new IIPCI() {
+            @Override
+            public Object call(IIPCHandle caller, Object req) throws Exception {
+                throw new IllegalStateException();
+            }
+        };
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+    }
+}
\ No newline at end of file