Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-ipc/pom.xml b/fullstack/hyracks/hyracks-ipc/pom.xml
new file mode 100644
index 0000000..25bfd0b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-ipc</artifactId>
+  <name>hyracks-ipc</name>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>junit</groupId>
+  	<artifactId>junit</artifactId>
+  	<version>4.8.1</version>
+  	<scope>test</scope>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
new file mode 100644
index 0000000..8a3630f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCHandle {
+    public InetSocketAddress getRemoteAddress();
+
+    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+
+    public void setAttachment(Object attachment);
+
+    public Object getAttachment();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
new file mode 100644
index 0000000..24ab943
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IIPCI {
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
new file mode 100644
index 0000000..1873378
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IPCPerformanceCounters {
+    private final AtomicLong nMessagesSent;
+
+    private final AtomicLong nMessageBytesSent;
+
+    private final AtomicLong nMessagesReceived;
+
+    private final AtomicLong nMessageBytesReceived;
+
+    public IPCPerformanceCounters() {
+        nMessagesSent = new AtomicLong();
+        nMessageBytesSent = new AtomicLong();
+        nMessagesReceived = new AtomicLong();
+        nMessageBytesReceived = new AtomicLong();
+    }
+
+    public long getMessageSentCount() {
+        return nMessagesSent.get();
+    }
+
+    public void addMessageSentCount(long delta) {
+        nMessagesSent.addAndGet(delta);
+    }
+
+    public long getMessageBytesSent() {
+        return nMessageBytesSent.get();
+    }
+
+    public void addMessageBytesSent(long delta) {
+        nMessageBytesSent.addAndGet(delta);
+    }
+
+    public long getMessageReceivedCount() {
+        return nMessagesReceived.get();
+    }
+
+    public void addMessageReceivedCount(long delta) {
+        nMessagesReceived.addAndGet(delta);
+    }
+
+    public long getMessageBytesReceived() {
+        return nMessageBytesReceived.get();
+    }
+
+    public void addMessageBytesReceived(long delta) {
+        nMessageBytesReceived.addAndGet(delta);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..62648ff
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.nio.ByteBuffer;
+
+public interface IPayloadSerializerDeserializer {
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
+
+    public byte[] serializeObject(Object object) throws Exception;
+
+    public byte[] serializeException(Exception object) throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
new file mode 100644
index 0000000..7d25f88
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IResponseCallback {
+    public void callback(IIPCHandle handle, Object response, Exception exception);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
new file mode 100644
index 0000000..3340516
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RPCInterface implements IIPCI {
+    private final Map<Long, Request> reqMap;
+
+    public RPCInterface() {
+        reqMap = new HashMap<Long, RPCInterface.Request>();
+    }
+
+    public Object call(IIPCHandle handle, Object request) throws Exception {
+        Request req;
+        synchronized (this) {
+            req = new Request();
+            long mid = handle.send(-1, request, null);
+            reqMap.put(mid, req);
+        }
+        return req.getResponse();
+    }
+
+    @Override
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+        Request req;
+        synchronized (this) {
+            req = reqMap.remove(rmid);
+        }
+        assert req != null;
+        if (exception != null) {
+            req.setException(exception);
+        } else {
+            req.setResult(payload);
+        }
+    }
+
+    private static class Request {
+        private boolean pending;
+
+        private Object result;
+
+        private Exception exception;
+
+        Request() {
+            pending = true;
+            result = null;
+            exception = null;
+        }
+
+        synchronized void setResult(Object result) {
+            this.pending = false;
+            this.result = result;
+            notifyAll();
+        }
+
+        synchronized void setException(Exception exception) {
+            this.pending = false;
+            this.exception = exception;
+            notifyAll();
+        }
+
+        synchronized Object getResponse() throws Exception {
+            while (pending) {
+                wait();
+            }
+            if (exception != null) {
+                throw exception;
+            }
+            return result;
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
new file mode 100644
index 0000000..9ecf015
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.exceptions;
+
+public class IPCException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public IPCException() {
+        super();
+    }
+
+    public IPCException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IPCException(String message) {
+        super(message);
+    }
+
+    public IPCException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
new file mode 100644
index 0000000..47c3d4a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+enum HandleState {
+    INITIAL,
+    CONNECT_SENT,
+    CONNECT_RECEIVED,
+    CONNECTED,
+    CLOSED,
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
new file mode 100644
index 0000000..8e42d53
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class IPCConnectionManager {
+    private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+
+    private final IPCSystem system;
+
+    private final NetworkThread networkThread;
+
+    private final ServerSocketChannel serverSocketChannel;
+
+    private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
+
+    private final List<IPCHandle> pendingConnections;
+
+    private final List<IPCHandle> workingPendingConnections;
+
+    private final List<Message> sendList;
+
+    private final List<Message> workingSendList;
+
+    private final InetSocketAddress address;
+
+    private volatile boolean stopped;
+
+    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+        this.system = system;
+        this.networkThread = new NetworkThread();
+        this.serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.socket().setReuseAddress(true);
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket socket = serverSocketChannel.socket();
+        socket.bind(socketAddress);
+        address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+        ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
+        pendingConnections = new ArrayList<IPCHandle>();
+        workingPendingConnections = new ArrayList<IPCHandle>();
+        sendList = new ArrayList<Message>();
+        workingSendList = new ArrayList<Message>();
+    }
+
+    InetSocketAddress getAddress() {
+        return address;
+    }
+
+    void start() {
+        stopped = false;
+        networkThread.start();
+    }
+
+    void stop() throws IOException {
+        stopped = true;
+        serverSocketChannel.close();
+    }
+
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+        IPCHandle handle;
+        synchronized (this) {
+            handle = ipcHandleMap.get(remoteAddress);
+            if (handle == null) {
+                handle = new IPCHandle(system, remoteAddress);
+                pendingConnections.add(handle);
+                networkThread.selector.wakeup();
+            }
+        }
+        handle.waitTillConnected();
+        return handle;
+    }
+
+    synchronized void registerHandle(IPCHandle handle) {
+        ipcHandleMap.put(handle.getRemoteAddress(), handle);
+    }
+
+    synchronized void write(Message msg) {
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Enqueued message: " + msg);
+        }
+        sendList.add(msg);
+        networkThread.selector.wakeup();
+    }
+
+    private synchronized void collectOutstandingWork() {
+        if (!pendingConnections.isEmpty()) {
+            moveAll(pendingConnections, workingPendingConnections);
+        }
+        if (!sendList.isEmpty()) {
+            moveAll(sendList, workingSendList);
+        }
+    }
+
+    private Message createInitialReqMessage(IPCHandle handle) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(-1);
+        msg.setFlag(Message.INITIAL_REQ);
+        msg.setPayload(address);
+        return msg;
+    }
+
+    private Message createInitialAckMessage(IPCHandle handle, Message req) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(req.getMessageId());
+        msg.setFlag(Message.INITIAL_ACK);
+        msg.setPayload(null);
+        return msg;
+    }
+
+    void ack(IPCHandle handle, Message req) {
+        write(createInitialAckMessage(handle, req));
+    }
+
+    private class NetworkThread extends Thread {
+        private final Selector selector;
+
+        public NetworkThread() {
+            super("IPC Network Listener Thread");
+            setDaemon(true);
+            try {
+                selector = Selector.open();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+            } catch (ClosedChannelException e) {
+                throw new RuntimeException(e);
+            }
+            BitSet unsentMessagesBitmap = new BitSet();
+            List<Message> tempUnsentMessages = new ArrayList<Message>();
+            while (!stopped) {
+                try {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting Select");
+                    }
+                    int n = selector.select();
+                    collectOutstandingWork();
+                    if (!workingPendingConnections.isEmpty()) {
+                        for (IPCHandle handle : workingPendingConnections) {
+                            SocketChannel channel = SocketChannel.open();
+                            channel.configureBlocking(false);
+                            SelectionKey cKey = null;
+                            if (channel.connect(handle.getRemoteAddress())) {
+                                cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setState(HandleState.CONNECT_SENT);
+                                write(createInitialReqMessage(handle));
+                            } else {
+                                cKey = channel.register(selector, SelectionKey.OP_CONNECT);
+                            }
+                            handle.setKey(cKey);
+                            cKey.attach(handle);
+                        }
+                        workingPendingConnections.clear();
+                    }
+                    if (!workingSendList.isEmpty()) {
+                        unsentMessagesBitmap.clear();
+                        int len = workingSendList.size();
+                        for (int i = 0; i < len; ++i) {
+                            Message msg = workingSendList.get(i);
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("Processing send of message: " + msg);
+                            }
+                            IPCHandle handle = msg.getIPCHandle();
+                            if (handle.getState() != HandleState.CLOSED) {
+                                if (!handle.full()) {
+                                    while (true) {
+                                        ByteBuffer buffer = handle.getOutBuffer();
+                                        buffer.compact();
+                                        boolean success = msg.write(buffer);
+                                        buffer.flip();
+                                        if (success) {
+                                            system.getPerformanceCounters().addMessageSentCount(1);
+                                            SelectionKey key = handle.getKey();
+                                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                        } else {
+                                            if (!buffer.hasRemaining()) {
+                                                handle.resizeOutBuffer();
+                                                continue;
+                                            }
+                                            handle.markFull();
+                                            unsentMessagesBitmap.set(i);
+                                        }
+                                        break;
+                                    }
+                                } else {
+                                    unsentMessagesBitmap.set(i);
+                                }
+                            }
+                        }
+                        copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
+                    }
+                    if (n > 0) {
+                        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+                            SelectionKey key = i.next();
+                            i.remove();
+                            SelectableChannel sc = key.channel();
+                            if (key.isReadable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer readBuffer = handle.getInBuffer();
+                                int len = channel.read(readBuffer);
+                                system.getPerformanceCounters().addMessageBytesReceived(len);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else {
+                                    handle.processIncomingMessages();
+                                    if (!readBuffer.hasRemaining()) {
+                                        handle.resizeInBuffer();
+                                    }
+                                }
+                            } else if (key.isWritable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer writeBuffer = handle.getOutBuffer();
+                                int len = channel.write(writeBuffer);
+                                system.getPerformanceCounters().addMessageBytesSent(len);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else if (!writeBuffer.hasRemaining()) {
+                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                                }
+                                if (handle.full()) {
+                                    handle.clearFull();
+                                    selector.wakeup();
+                                }
+                            } else if (key.isAcceptable()) {
+                                assert sc == serverSocketChannel;
+                                SocketChannel channel = serverSocketChannel.accept();
+                                channel.configureBlocking(false);
+                                IPCHandle handle = new IPCHandle(system, null);
+                                SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setKey(cKey);
+                                cKey.attach(handle);
+                                handle.setState(HandleState.CONNECT_RECEIVED);
+                            } else if (key.isConnectable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                if (channel.finishConnect()) {
+                                    IPCHandle handle = (IPCHandle) key.attachment();
+                                    handle.setState(HandleState.CONNECT_SENT);
+                                    registerHandle(handle);
+                                    key.interestOps(SelectionKey.OP_READ);
+                                    write(createInitialReqMessage(handle));
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
+            assert tempUnsentMessages.isEmpty();
+            for (int i = unsentMessagesBitmap.nextSetBit(0); i >= 0; i = unsentMessagesBitmap.nextSetBit(i + 1)) {
+                tempUnsentMessages.add(workingSendList.get(i));
+            }
+            workingSendList.clear();
+            moveAll(tempUnsentMessages, workingSendList);
+        }
+    }
+
+    private <T> void moveAll(List<T> source, List<T> target) {
+        int len = source.size();
+        for (int i = 0; i < len; ++i) {
+            target.add(source.get(i));
+        }
+        source.clear();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
new file mode 100644
index 0000000..06b614d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+final class IPCHandle implements IIPCHandle {
+    private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+
+    private final IPCSystem system;
+
+    private InetSocketAddress remoteAddress;
+
+    private HandleState state;
+
+    private SelectionKey key;
+
+    private Object attachment;
+
+    private ByteBuffer inBuffer;
+
+    private ByteBuffer outBuffer;
+
+    private boolean full;
+
+    IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
+        this.system = system;
+        this.remoteAddress = remoteAddress;
+        inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer.flip();
+        state = HandleState.INITIAL;
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    IPCSystem getIPCSystem() {
+        return system;
+    }
+
+    void setRemoteAddress(InetSocketAddress remoteAddress) {
+        this.remoteAddress = remoteAddress;
+    }
+
+    @Override
+    public long send(long requestId, Object req, Exception exception) throws IPCException {
+        if (!isConnected()) {
+            throw new IPCException("Handle is not in Connected state");
+        }
+        Message msg = new Message(this);
+        long mid = system.createMessageId();
+        msg.setMessageId(mid);
+        msg.setRequestMessageId(requestId);
+        if (exception != null) {
+            msg.setFlag(Message.ERROR);
+            msg.setPayload(exception);
+        } else {
+            msg.setFlag(Message.NORMAL);
+            msg.setPayload(req);
+        }
+        system.getConnectionManager().write(msg);
+        return mid;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    SelectionKey getKey() {
+        return key;
+    }
+
+    void setKey(SelectionKey key) {
+        this.key = key;
+    }
+
+    public synchronized boolean isConnected() {
+        return state == HandleState.CONNECTED;
+    }
+
+    synchronized HandleState getState() {
+        return state;
+    }
+
+    synchronized void setState(HandleState state) {
+        this.state = state;
+        notifyAll();
+    }
+
+    synchronized void waitTillConnected() throws InterruptedException {
+        while (!isConnected()) {
+            wait();
+        }
+    }
+
+    ByteBuffer getInBuffer() {
+        return inBuffer;
+    }
+
+    ByteBuffer getOutBuffer() {
+        return outBuffer;
+    }
+
+    synchronized void close() {
+        setState(HandleState.CLOSED);
+    }
+
+    void processIncomingMessages() {
+        inBuffer.flip();
+        while (Message.hasMessage(inBuffer)) {
+            Message message = new Message(this);
+            try {
+                message.read(inBuffer);
+            } catch (Exception e) {
+                message.setFlag(Message.ERROR);
+                message.setPayload(e);
+            }
+            system.getPerformanceCounters().addMessageReceivedCount(1);
+
+            if (state == HandleState.CONNECT_RECEIVED) {
+                remoteAddress = (InetSocketAddress) message.getPayload();
+                system.getConnectionManager().registerHandle(this);
+                setState(HandleState.CONNECTED);
+                system.getConnectionManager().ack(this, message);
+                continue;
+            } else if (state == HandleState.CONNECT_SENT) {
+                if (message.getFlag() == Message.INITIAL_ACK) {
+                    setState(HandleState.CONNECTED);
+                } else {
+                    throw new IllegalStateException();
+                }
+                continue;
+            }
+            system.deliverIncomingMessage(message);
+        }
+        inBuffer.compact();
+    }
+
+    void resizeInBuffer() {
+        inBuffer.flip();
+        ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
+        readBuffer.put(inBuffer);
+        inBuffer = readBuffer;
+    }
+
+    void resizeOutBuffer() {
+        ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
+        writeBuffer.put(outBuffer);
+        writeBuffer.flip();
+        outBuffer = writeBuffer;
+    }
+
+    void markFull() {
+        full = true;
+    }
+
+    void clearFull() {
+        full = false;
+    }
+
+    boolean full() {
+        return full;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
new file mode 100644
index 0000000..d7e383d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCSystem {
+    private final IPCConnectionManager cMgr;
+
+    private final IIPCI ipci;
+
+    private final IPayloadSerializerDeserializer serde;
+
+    private final AtomicLong midFactory;
+
+    private final IPCPerformanceCounters perfCounters;
+
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
+            throws IOException {
+        cMgr = new IPCConnectionManager(this, socketAddress);
+        this.ipci = ipci;
+        this.serde = serde;
+        midFactory = new AtomicLong();
+        perfCounters = new IPCPerformanceCounters();
+    }
+
+    public InetSocketAddress getSocketAddress() {
+        return cMgr.getAddress();
+    }
+
+    public void start() {
+        cMgr.start();
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        try {
+            return cMgr.getIPCHandle(remoteAddress);
+        } catch (IOException e) {
+            throw new IPCException(e);
+        } catch (InterruptedException e) {
+            throw new IPCException(e);
+        }
+    }
+
+    IPayloadSerializerDeserializer getSerializerDeserializer() {
+        return serde;
+    }
+
+    long createMessageId() {
+        return midFactory.incrementAndGet();
+    }
+
+    void deliverIncomingMessage(final Message message) {
+        long mid = message.getMessageId();
+        long rmid = message.getRequestMessageId();
+        Object payload = null;
+        Exception exception = null;
+        if (message.getFlag() == Message.ERROR) {
+            exception = (Exception) message.getPayload();
+        } else {
+            payload = message.getPayload();
+        }
+        ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception);
+    }
+
+    IPCConnectionManager getConnectionManager() {
+        return cMgr;
+    }
+
+    public IPCPerformanceCounters getPerformanceCounters() {
+        return perfCounters;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..fdf8e92
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+    @Override
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+        return deserialize(buffer, length);
+    }
+
+    @Override
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+        return (Exception) deserialize(buffer, length);
+    }
+
+    @Override
+    public byte[] serializeObject(Object object) throws Exception {
+        return serialize(object);
+    }
+
+    @Override
+    public byte[] serializeException(Exception exception) throws Exception {
+        return serialize(exception);
+    }
+
+    public static void serialize(OutputStream out, Object object) throws Exception {
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(object);
+        oos.flush();
+    }
+
+    private Object deserialize(ByteBuffer buffer, int length) throws Exception {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+                length));
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+    }
+
+    private byte[] serialize(Object object) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serialize(baos, object);
+        baos.close();
+        return baos.toByteArray();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
new file mode 100644
index 0000000..6bb3156
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+class Message {
+    private static final int MSG_SIZE_SIZE = 4;
+
+    private static final int HEADER_SIZE = 17;
+
+    static final byte INITIAL_REQ = 1;
+
+    static final byte INITIAL_ACK = 2;
+
+    static final byte ERROR = 3;
+
+    static final byte NORMAL = 0;
+
+    private IPCHandle ipcHandle;
+
+    private long messageId;
+
+    private long requestMessageId;
+
+    private byte flag;
+
+    private Object payload;
+
+    Message(IPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    IPCHandle getIPCHandle() {
+        return ipcHandle;
+    }
+
+    void setMessageId(long messageId) {
+        this.messageId = messageId;
+    }
+
+    long getMessageId() {
+        return messageId;
+    }
+
+    void setRequestMessageId(long requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    long getRequestMessageId() {
+        return requestMessageId;
+    }
+
+    void setFlag(byte flag) {
+        this.flag = flag;
+    }
+
+    byte getFlag() {
+        return flag;
+    }
+
+    void setPayload(Object payload) {
+        this.payload = payload;
+    }
+
+    Object getPayload() {
+        return payload;
+    }
+
+    static boolean hasMessage(ByteBuffer buffer) {
+        if (buffer.remaining() < MSG_SIZE_SIZE) {
+            return false;
+        }
+        int msgSize = buffer.getInt(buffer.position());
+        return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+    }
+
+    void read(ByteBuffer buffer) throws Exception {
+        assert hasMessage(buffer);
+        int msgSize = buffer.getInt();
+        messageId = buffer.getLong();
+        requestMessageId = buffer.getLong();
+        flag = buffer.get();
+        int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+        int length = msgSize - HEADER_SIZE;
+        try {
+            IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+            payload = flag == ERROR ? serde.deserializeException(buffer, length) : serde.deserializeObject(buffer,
+                    length);
+        } finally {
+            buffer.position(finalPosition);
+        }
+    }
+
+    boolean write(ByteBuffer buffer) throws Exception {
+        IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+        byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
+        if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
+            buffer.putInt(HEADER_SIZE + bytes.length);
+            buffer.putLong(messageId);
+            buffer.putLong(requestMessageId);
+            buffer.put(flag);
+            buffer.put(bytes);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "MSG[" + messageId + ":" + requestMessageId + ":" + flag + ":" + payload + "]";
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/fullstack/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
new file mode 100644
index 0000000..5b2f660
--- /dev/null
+++ b/fullstack/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.tests;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class IPCTest {
+    @Test
+    public void test() throws Exception {
+        IPCSystem server = createServerIPCSystem();
+        server.start();
+        InetSocketAddress serverAddr = server.getSocketAddress();
+
+        RPCInterface rpci = new RPCInterface();
+        IPCSystem client = createClientIPCSystem(rpci);
+        client.start();
+
+        IIPCHandle handle = client.getHandle(serverAddr);
+
+        for (int i = 0; i < 100; ++i) {
+            Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+        }
+
+        try {
+            rpci.call(handle, "Foo");
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    private IPCSystem createServerIPCSystem() throws IOException {
+        final Executor executor = Executors.newCachedThreadPool();
+        IIPCI ipci = new IIPCI() {
+            @Override
+            public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid,
+                    final Object payload, Exception exception) {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        Object result = null;
+                        Exception exception = null;
+                        try {
+                            Integer i = (Integer) payload;
+                            result = i.intValue() * 2;
+                        } catch (Exception e) {
+                            exception = e;
+                        }
+                        try {
+                            handle.send(mid, result, exception);
+                        } catch (IPCException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        };
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
+    }
+
+    private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
+    }
+}
\ No newline at end of file