Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index d6c2165..06b76ef 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -17,7 +17,7 @@
 import java.util.EnumSet;
 
 import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -32,9 +32,8 @@
     public static final String NC1_ID = "nc1";
     public static final String NC2_ID = "nc2";
 
-    public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
-
-    public static final int TEST_HYRACKS_CC_PORT = 4322;
+    public static final int TEST_HYRACKS_CC_CLUSTER_NET_PORT = 4322;
+    public static final int TEST_HYRACKS_CC_CLIENT_NET_PORT = 4321;
 
     private static ClusterControllerService cc;
     private static NodeControllerService nc1;
@@ -43,14 +42,18 @@
 
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.port = TEST_HYRACKS_CC_PORT;
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_NET_PORT;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         // ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
         NCConfig ncConfig1 = new NCConfig();
         ncConfig1.ccHost = "localhost";
-        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
@@ -58,13 +61,14 @@
 
         NCConfig ncConfig2 = new NCConfig();
         ncConfig2.ccHost = "localhost";
-        ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
-        hcc = new HyracksLocalConnection(cc);
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
         hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
     }
 
@@ -77,9 +81,9 @@
     public static void runJob(JobSpecification spec) throws Exception {
         JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
-        cc.start(jobId);
+        cc.startJob(jobId);
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
         cc.waitForCompletion(jobId);
     }
 
-}
+}
\ No newline at end of file
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index f73c27a..ad101d5 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -1,9 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-api</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
@@ -45,5 +42,10 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-ipc</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+    public enum FunctionId {
+        GET_CLUSTER_CONTROLLER_INFO,
+        CREATE_APPLICATION,
+        START_APPLICATION,
+        DESTROY_APPLICATION,
+        CREATE_JOB,
+        GET_JOB_STATUS,
+        START_JOB,
+        WAIT_FOR_COMPLETION,
+        GET_NODE_CONTROLLERS_INFO
+    }
+
+    public abstract static class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class GetClusterControllerInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public CreateApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class StartApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public StartApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class CreateJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final byte[] jobSpec;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+            this.appName = appName;
+            this.jobSpec = jobSpec;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_JOB;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public byte[] getJobSpec() {
+            return jobSpec;
+        }
+
+        public EnumSet<JobFlag> getJobFlags() {
+            return jobFlags;
+        }
+    }
+
+    public static class GetJobStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobStatusFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public StartJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class WaitForCompletionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public WaitForCompletionFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.WAIT_FOR_COMPLETION;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+    private final IIPCHandle ipcHandle;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+                appName);
+        sync.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+                appName);
+        sync.call(ipcHandle, saf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+                appName);
+        sync.call(ipcHandle, daf);
+    }
+
+    @Override
+    public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+                appName, jobSpec, jobFlags);
+        return (JobId) sync.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+                jobId);
+        return (JobStatus) sync.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public void startJob(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+                jobId);
+        sync.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+                jobId);
+        sync.call(ipcHandle, wfcf);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
similarity index 75%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 90bbfb1..1b38cb9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.client;
 
 import java.io.File;
+import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.Map;
 
@@ -30,17 +31,42 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
-abstract class AbstractHyracksConnection implements IHyracksClientConnection {
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ * 
+ * @author vinayakb
+ * 
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
 
+    private final IPCSystem ipc;
+
     private final IHyracksClientInterface hci;
 
     private final ClusterControllerInfo ccInfo;
 
-    public AbstractHyracksConnection(String ccHost, IHyracksClientInterface hci) throws Exception {
+    /**
+     * Constructor to create a connection to the Hyracks Cluster Controller.
+     * 
+     * @param ccHost
+     *            Host name (or IP Address) where the Cluster Controller can be
+     *            reached.
+     * @param ccPort
+     *            Port to reach the Hyracks Cluster Controller at the specified
+     *            host name.
+     * @throws Exception
+     */
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
-        this.hci = hci;
+        ipc = new IPCSystem(new InetSocketAddress(0));
+        ipc.start();
+        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
         ccInfo = hci.getClusterControllerInfo();
     }
 
@@ -82,7 +108,7 @@
 
     @Override
     public void start(JobId jobId) throws Exception {
-        hci.start(jobId);
+        hci.startJob(jobId);
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
deleted file mode 100644
index ee0b276..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-/**
- * Connection Class used by a Hyracks Client that is colocated in the same VM
- * with the Cluster Controller. Usually, clients must not use this class. This
- * is used internally for testing purposes.
- * 
- * @author vinayakb
- * 
- */
-public final class HyracksLocalConnection extends AbstractHyracksConnection {
-    public HyracksLocalConnection(IHyracksClientInterface hci) throws Exception {
-        super("localhost", hci);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
deleted file mode 100644
index 65d82a8..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-/**
- * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
- * Controller using RMI. Usually, such a connection would be used when the CC
- * runs in a separate JVM from the client (The most common case).
- * 
- * @author vinayakb
- * 
- */
-public final class HyracksRMIConnection extends AbstractHyracksConnection {
-    /**
-     * Constructor to create a connection to the Hyracks Cluster Controller.
-     * 
-     * @param host
-     *            Host name (or IP Address) where the Cluster Controller can be
-     *            reached.
-     * @param port
-     *            Port to reach the Hyracks Cluster Controller at the specified
-     *            host name.
-     * @throws Exception
-     */
-    public HyracksRMIConnection(String host, int port) throws Exception {
-        super(host, lookupHCI(host, port));
-    }
-
-    private static IHyracksClientInterface lookupHCI(String host, int port) throws RemoteException, NotBoundException {
-        Registry registry = LocateRegistry.getRegistry(host, port);
-        return (IHyracksClientInterface) registry.lookup(IHyracksClientInterface.class.getName());
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.client;
 
-import java.rmi.Remote;
 import java.util.EnumSet;
 import java.util.Map;
 
@@ -22,7 +21,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
     public ClusterControllerInfo getClusterControllerInfo() throws Exception;
 
     public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
 
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
-    public void start(JobId jobId) throws Exception;
+    public void startJob(JobId jobId) throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;
 
diff --git a/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java b/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
index e1a1fd2..09dbc8c 100644
--- a/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
+++ b/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
@@ -1,6 +1,6 @@
 package edu.uci.ics.hyracks.cli.commands;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.cli.Session;
 
@@ -22,7 +22,7 @@
     @Override
     public void run(Session session) throws Exception {
         System.err.println("Connecting to host: " + host + ", port: " + port);
-        IHyracksClientConnection conn = new HyracksRMIConnection(host, port);
+        IHyracksClientConnection conn = new HyracksConnection(host, port);
         session.setConnection(conn);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
deleted file mode 100644
index 70e85a1..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package edu.uci.ics.hyracks.control.cc;
-
-import java.rmi.RemoteException;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.EnumSet;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-
-public class CCClientInterface extends UnicastRemoteObject implements IHyracksClientInterface {
-    private static final long serialVersionUID = 1L;
-
-    private final ClusterControllerService ccs;
-
-    public CCClientInterface(ClusterControllerService ccs) throws RemoteException {
-        this.ccs = ccs;
-    }
-
-    @Override
-    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
-        return ccs.getClusterControllerInfo();
-    }
-
-    @Override
-    public void createApplication(String appName) throws Exception {
-        ccs.createApplication(appName);
-    }
-
-    @Override
-    public void startApplication(String appName) throws Exception {
-        ccs.startApplication(appName);
-    }
-
-    @Override
-    public void destroyApplication(String appName) throws Exception {
-        ccs.destroyApplication(appName);
-    }
-
-    @Override
-    public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        return ccs.createJob(appName, jobSpec, jobFlags);
-    }
-
-    @Override
-    public JobStatus getJobStatus(JobId jobId) throws Exception {
-        return ccs.getJobStatus(jobId);
-    }
-
-    @Override
-    public void start(JobId jobId) throws Exception {
-        ccs.start(jobId);
-    }
-
-    @Override
-    public void waitForCompletion(JobId jobId) throws Exception {
-        ccs.waitForCompletion(jobId);
-    }
-
-    @Override
-    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
-        return ccs.getNodeControllersInfo();
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 935ce88..4575adc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -15,8 +15,7 @@
 package edu.uci.ics.hyracks.control.cc;
 
 import java.io.File;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
+import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -40,6 +39,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.ipc.HyracksClientInterfaceDelegateIPCI;
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
@@ -69,6 +69,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerDelegateIPCI;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -76,14 +78,19 @@
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
 public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
         IHyracksClientInterface {
-    private static final long serialVersionUID = 1L;
+    private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
 
     private final CCConfig ccConfig;
 
-    private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+    private IPCSystem clusterIPC;
+
+    private IPCSystem clientIPC;
 
     private final LogFile jobLog;
 
@@ -105,12 +112,10 @@
 
     private final WorkQueue workQueue;
 
-    private final Executor taskExecutor;
+    private final Executor executor;
 
     private final Timer timer;
 
-    private final CCClientInterface ccci;
-
     private final ICCContext ccContext;
 
     private final DeadNodeSweeper sweeper;
@@ -125,7 +130,12 @@
         ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
-        taskExecutor = Executors.newCachedThreadPool();
+        executor = Executors.newCachedThreadPool();
+        IIPCI ccIPCI = new ClusterControllerDelegateIPCI(this);
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, executor);
+        IIPCI ciIPCI = new HyracksClientInterfaceDelegateIPCI(this);
+        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+                executor);
         webServer = new WebServer(this);
         activeRunMap = new HashMap<JobId, JobRun>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -137,7 +147,6 @@
         };
         workQueue = new WorkQueue();
         this.timer = new Timer(true);
-        ccci = new CCClientInterface(this);
         ccContext = new ICCContext() {
             @Override
             public Map<String, Set<String>> getIPAddressNodeMap() {
@@ -151,9 +160,8 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
-        Registry registry = LocateRegistry.createRegistry(ccConfig.port);
-        registry.rebind(IHyracksClientInterface.class.getName(), ccci);
-        registry.rebind(IClusterController.class.getName(), this);
+        clusterIPC.start();
+        clientIPC.start();
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
         workQueue.start();
@@ -203,7 +211,7 @@
     }
 
     public Executor getExecutor() {
-        return taskExecutor;
+        return executor;
     }
 
     public Map<String, NodeControllerState> getNodeMap() {
@@ -229,13 +237,16 @@
 
     @Override
     public NodeParameters registerNode(NodeRegistration reg) throws Exception {
-        INodeController nodeController = reg.getNodeController();
+        InetSocketAddress ncAddress = reg.getNodeControllerAddress();
         String id = reg.getNodeId();
+
+        IIPCHandle ncIPCHandle = clusterIPC.getHandle(reg.getNodeControllerAddress());
+        INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
+
         NodeControllerState state = new NodeControllerState(nodeController, reg);
         workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
         NodeParameters params = new NodeParameters();
-        params.setClusterController(this);
         params.setClusterControllerInfo(info);
         params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
         params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
@@ -243,9 +254,8 @@
     }
 
     @Override
-    public void unregisterNode(INodeController nodeController) throws Exception {
-        String id = nodeController.getId();
-        workQueue.schedule(new UnregisterNodeWork(this, id));
+    public void unregisterNode(String nodeId) throws Exception {
+        workQueue.schedule(new UnregisterNodeWork(this, nodeId));
     }
 
     @Override
@@ -275,7 +285,7 @@
     }
 
     @Override
-    public void start(JobId jobId) throws Exception {
+    public void startJob(JobId jobId) throws Exception {
         JobStartWork jse = new JobStartWork(this, jobId);
         workQueue.schedule(jse);
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
new file mode 100644
index 0000000..b3aa406
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.ipc;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
+    private final IHyracksClientInterface hci;
+
+    public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
+        this.hci = hci;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
+        switch (fn.getFunctionId()) {
+            case GET_CLUSTER_CONTROLLER_INFO: {
+                return hci.getClusterControllerInfo();
+            }
+
+            case CREATE_APPLICATION: {
+                HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+                hci.createApplication(caf.getAppName());
+                return null;
+            }
+
+            case START_APPLICATION: {
+                HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+                hci.startApplication(saf.getAppName());
+                return null;
+            }
+
+            case DESTROY_APPLICATION: {
+                HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+                hci.destroyApplication(daf.getAppName());
+                return null;
+            }
+
+            case CREATE_JOB: {
+                HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+                return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
+            }
+
+            case GET_JOB_STATUS: {
+                HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+                return hci.getJobStatus(gjsf.getJobId());
+            }
+
+            case START_JOB: {
+                HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                hci.startJob(sjf.getJobId());
+                return null;
+            }
+
+            case WAIT_FOR_COMPLETION: {
+                HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                hci.waitForCompletion(wfcf.getJobId());
+                return null;
+            }
+
+            case GET_NODE_CONTROLLERS_INFO: {
+                return hci.getNodeControllersInfo();
+            }
+        }
+        throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 24bee8c..e2c9ca3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -448,7 +448,7 @@
                     public void run() {
                         try {
                             node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors, connectorPolicies, null);
+                                    taskDescriptors, connectorPolicies);
                         } catch (IOException e) {
                             e.printStackTrace();
                         } catch (Exception e) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
index cf64554..42d0ecb 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -14,14 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.common;
 
-import java.rmi.RemoteException;
-import java.rmi.server.UnicastRemoteObject;
-
 import edu.uci.ics.hyracks.control.common.service.IService;
 
-public abstract class AbstractRemoteService extends UnicastRemoteObject implements IService {
-    private static final long serialVersionUID = 1L;
-
-    public AbstractRemoteService() throws RemoteException {
+public abstract class AbstractRemoteService implements IService {
+    public AbstractRemoteService() {
     }
 }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 53003d0..40e9347 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.common.base;
 
-import java.rmi.Remote;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
@@ -27,10 +26,10 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
-public interface IClusterController extends Remote {
+public interface IClusterController {
     public NodeParameters registerNode(NodeRegistration reg) throws Exception;
 
-    public void unregisterNode(INodeController nodeController) throws Exception;
+    public void unregisterNode(String nodeId) throws Exception;
 
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index cca9d81..47f4ceb 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.common.base;
 
-import java.rmi.Remote;
 import java.util.List;
 import java.util.Map;
 
@@ -25,16 +24,11 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
-public interface INodeController extends Remote {
-    public String getId() throws Exception;
-
-    public NCConfig getConfiguration() throws Exception;
-
+public interface INodeController {
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, byte[] ctxVarBytes) throws Exception;
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index 5091d12..4536fc4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -19,8 +19,17 @@
 import org.kohsuke.args4j.Option;
 
 public class CCConfig {
-    @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
-    public int port = 1099;
+    @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients", required = true)
+    public String clientNetIpAddress;
+
+    @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
+    public int clientNetPort = 1098;
+
+    @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from ", required = true)
+    public String clusterNetIpAddress;
+
+    @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+    public int clusterNetPort = 1099;
 
     @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
     public int httpPort = 19001;
@@ -44,8 +53,14 @@
     public String ccRoot = "ClusterControllerService";
 
     public void toCommandLine(List<String> cList) {
-        cList.add("-port");
-        cList.add(String.valueOf(port));
+        cList.add("-client-net-ip-address");
+        cList.add(clientNetIpAddress);
+        cList.add("-client-net-port");
+        cList.add(String.valueOf(clientNetPort));
+        cList.add("-cluster-net-ip-address");
+        cList.add(clusterNetIpAddress);
+        cList.add("-cluster-net-port");
+        cList.add(String.valueOf(clusterNetPort));
         cList.add("-http-port");
         cList.add(String.valueOf(httpPort));
         cList.add("-heartbeat-period");
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 02ce0f8..c55f34a 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -22,16 +22,19 @@
 public class NCConfig implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    @Option(name = "-cc-host", usage = "Cluster Controller host name")
+    @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
     public String ccHost;
 
     @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
     public int ccPort = 1099;
 
-    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster")
+    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+    public String clusterNetIPAddress;
+
+    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
     public String nodeId;
 
-    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener")
+    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
     public String dataIPAddress;
 
     @Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
@@ -54,6 +57,8 @@
         cList.add(ccHost);
         cList.add("-cc-port");
         cList.add(String.valueOf(ccPort));
+        cList.add("-cluster-net-ip-address");
+        cList.add(clusterNetIPAddress);
         cList.add("-node-id");
         cList.add(nodeId);
         cList.add("-data-ip-address");
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
index d67b777..0161f96 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -17,27 +17,16 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
 
 public class NodeParameters implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private IClusterController cc;
-
     private ClusterControllerInfo ccInfo;
 
     private int heartbeatPeriod;
 
     private int profileDumpPeriod;
 
-    public IClusterController getClusterController() {
-        return cc;
-    }
-
-    public void setClusterController(IClusterController cc) {
-        this.cc = cc;
-    }
-
     public ClusterControllerInfo getClusterControllerInfo() {
         return ccInfo;
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index ff92c2d..f6dde10 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -15,15 +15,15 @@
 package edu.uci.ics.hyracks.control.common.controllers;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
 
 public final class NodeRegistration implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final INodeController nc;
+    private final InetSocketAddress ncAddress;
 
     private final String nodeId;
 
@@ -41,9 +41,9 @@
 
     private final HeartbeatSchema hbSchema;
 
-    public NodeRegistration(INodeController nc, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
+    public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             String osName, String arch, String osVersion, int nProcessors, HeartbeatSchema hbSchema) {
-        this.nc = nc;
+        this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
         this.dataPort = dataPort;
@@ -54,8 +54,8 @@
         this.hbSchema = hbSchema;
     }
 
-    public INodeController getNodeController() {
-        return nc;
+    public InetSocketAddress getNodeControllerAddress() {
+        return ncAddress;
     }
 
     public String getNodeId() {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
new file mode 100644
index 0000000..e35f962
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class ClusterControllerDelegateIPCI implements IIPCI {
+    private final IClusterController cc;
+
+    public ClusterControllerDelegateIPCI(IClusterController cc) {
+        this.cc = cc;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        ClusterControllerFunctions.Function fn = (Function) req;
+        switch (fn.getFunctionId()) {
+            case REGISTER_NODE: {
+                ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+                return cc.registerNode(rnf.getNodeRegistration());
+            }
+
+            case UNREGISTER_NODE: {
+                ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+                cc.unregisterNode(unf.getNodeId());
+                return null;
+            }
+
+            case NODE_HEARTBEAT: {
+                ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+                cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
+                return null;
+            }
+
+            case NOTIFY_JOBLET_CLEANUP: {
+                ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+                cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
+                return null;
+            }
+
+            case REPORT_PROFILE: {
+                ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+                cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
+                return null;
+            }
+
+            case NOTIFY_TASK_COMPLETE: {
+                ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+                cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
+                return null;
+            }
+            case NOTIFY_TASK_FAILURE: {
+                ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+                cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
+                return null;
+            }
+
+            case REGISTER_PARTITION_PROVIDER: {
+                ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+                cc.registerPartitionProvider(rppf.getPartitionDescriptor());
+                return null;
+            }
+
+            case REGISTER_PARTITION_REQUEST: {
+                ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+                cc.registerPartitionRequest(rprf.getPartitionRequest());
+                return null;
+            }
+        }
+        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
new file mode 100644
index 0000000..4c76357
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public class ClusterControllerFunctions {
+    public enum FunctionId {
+        REGISTER_NODE,
+        UNREGISTER_NODE,
+        NOTIFY_TASK_COMPLETE,
+        NOTIFY_TASK_FAILURE,
+        NOTIFY_JOBLET_CLEANUP,
+        NODE_HEARTBEAT,
+        REPORT_PROFILE,
+        REGISTER_PARTITION_PROVIDER,
+        REGISTER_PARTITION_REQUEST,
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class RegisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeRegistration reg;
+
+        public RegisterNodeFunction(NodeRegistration reg) {
+            this.reg = reg;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_NODE;
+        }
+
+        public NodeRegistration getNodeRegistration() {
+            return reg;
+        }
+    }
+
+    public static class UnregisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public UnregisterNodeFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNREGISTER_NODE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NotifyTaskCompleteFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final TaskProfile statistics;
+
+        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.statistics = statistics;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_COMPLETE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public TaskProfile getStatistics() {
+            return statistics;
+        }
+    }
+
+    public static class NotifyTaskFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final String details;
+
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.details = details;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+    }
+
+    public static class NotifyJobletCleanupFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_JOBLET_CLEANUP;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NodeHeartbeatFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final HeartbeatData hbData;
+
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+            this.nodeId = nodeId;
+            this.hbData = hbData;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public HeartbeatData getHeartbeatData() {
+            return hbData;
+        }
+    }
+
+    public static class ReportProfileFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final List<JobProfile> profiles;
+
+        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+            this.nodeId = nodeId;
+            this.profiles = profiles;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PROFILE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public List<JobProfile> getProfiles() {
+            return profiles;
+        }
+    }
+
+    public static class RegisterPartitionProviderFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionDescriptor partitionDescriptor;
+
+        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+            this.partitionDescriptor = partitionDescriptor;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_PROVIDER;
+        }
+
+        public PartitionDescriptor getPartitionDescriptor() {
+            return partitionDescriptor;
+        }
+    }
+
+    public static class RegisterPartitionRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionRequest partitionRequest;
+
+        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+            this.partitionRequest = partitionRequest;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_REQUEST;
+        }
+
+        public PartitionRequest getPartitionRequest() {
+            return partitionRequest;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..0aeab72
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+    private final IIPCHandle ipcHandle;
+
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public NodeParameters registerNode(NodeRegistration reg) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+        NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
+        return result;
+    }
+
+    @Override
+    public void unregisterNode(String nodeId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+                nodeId);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+            throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+                jobId, taskId, nodeId, statistics);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+                jobId, taskId, nodeId, details);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+                jobId, nodeId);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+                hbData);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+                profiles);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+                partitionDescriptor);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+                partitionRequest);
+        sync.call(ipcHandle, fn);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
new file mode 100644
index 0000000..f3e51ec
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class NodeControllerDelegateIPCI implements IIPCI {
+    private final INodeController nc;
+
+    public NodeControllerDelegateIPCI(INodeController nc) {
+        this.nc = nc;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
+        switch (fn.getFunctionId()) {
+            case START_TASKS: {
+                NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+                nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
+                        stf.getConnectorPolicies());
+                return null;
+            }
+
+            case ABORT_TASKS: {
+                NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+                nc.abortTasks(atf.getJobId(), atf.getTasks());
+                return null;
+            }
+
+            case CLEANUP_JOBLET: {
+                NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+                nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
+                return null;
+            }
+
+            case CREATE_APPLICATION: {
+                NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+                nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
+                return null;
+            }
+
+            case DESTROY_APPLICATION: {
+                NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+                nc.destroyApplication(daf.getAppName());
+                return null;
+            }
+
+            case REPORT_PARTITION_AVAILABILITY: {
+                NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+                nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
+                return null;
+            }
+        }
+        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
new file mode 100644
index 0000000..0d39c1b
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public class NodeControllerFunctions {
+    public enum FunctionId {
+        START_TASKS,
+        ABORT_TASKS,
+        CLEANUP_JOBLET,
+        CREATE_APPLICATION,
+        DESTROY_APPLICATION,
+        REPORT_PARTITION_AVAILABILITY
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class StartTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final JobId jobId;
+        private final byte[] planBytes;
+        private final List<TaskAttemptDescriptor> taskDescriptors;
+        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+            this.appName = appName;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+            this.taskDescriptors = taskDescriptors;
+            this.connectorPolicies = connectorPolicies;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_TASKS;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
+        }
+
+        public List<TaskAttemptDescriptor> getTaskDescriptors() {
+            return taskDescriptors;
+        }
+
+        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+            return connectorPolicies;
+        }
+    }
+
+    public static class AbortTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final List<TaskAttemptId> tasks;
+
+        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+            this.jobId = jobId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_TASKS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public List<TaskAttemptId> getTasks() {
+            return tasks;
+        }
+    }
+
+    public static class CleanupJobletFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final JobStatus status;
+
+        public CleanupJobletFunction(JobId jobId, JobStatus status) {
+            this.jobId = jobId;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLEANUP_JOBLET;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public JobStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final boolean deployHar;
+        private final byte[] serializedDistributedState;
+
+        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+            this.appName = appName;
+            this.deployHar = deployHar;
+            this.serializedDistributedState = serializedDistributedState;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public boolean isDeployHar() {
+            return deployHar;
+        }
+
+        public byte[] getSerializedDistributedState() {
+            return serializedDistributedState;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class ReportPartitionAvailabilityFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionId pid;
+        private final NetworkAddress networkAddress;
+
+        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+            this.pid = pid;
+            this.networkAddress = networkAddress;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PARTITION_AVAILABILITY;
+        }
+
+        public PartitionId getPartitionId() {
+            return pid;
+        }
+
+        public NetworkAddress getNetworkAddress() {
+            return networkAddress;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..eaa2f27
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class NodeControllerRemoteProxy implements INodeController {
+    private final IIPCHandle ipcHandle;
+
+    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+                planBytes, taskDescriptors, connectorPolicies);
+        sync.call(ipcHandle, stf);
+    }
+
+    @Override
+    public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+        sync.call(ipcHandle, atf);
+    }
+
+    @Override
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+                status);
+        sync.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+                appName, deployHar, serializedDistributedState);
+        sync.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+                appName);
+        sync.call(ipcHandle, daf);
+    }
+
+    @Override
+    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+                pid, networkAddress);
+        sync.call(ipcHandle, rpaf);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 8027af5..d869515 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -23,8 +23,7 @@
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
 import java.net.InetAddress;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
+import java.net.InetSocketAddress;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Hashtable;
@@ -58,6 +57,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerDelegateIPCI;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
@@ -74,18 +75,20 @@
 import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
-    private static final long serialVersionUID = 1L;
-
     private NCConfig ncConfig;
 
     private final String id;
 
     private final IHyracksRootContext ctx;
 
+    private final IPCSystem ipc;
+
     private final PartitionManager partitionManager;
 
     private final ConnectionManager connectionManager;
@@ -122,6 +125,8 @@
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
         executor = Executors.newCachedThreadPool();
+        NodeControllerDelegateIPCI ipci = new NodeControllerDelegateIPCI(this);
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci, executor);
         this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
@@ -160,28 +165,28 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        ipc.start();
         connectionManager.start();
-        Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
-        IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
-                .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
-                .getAvailableProcessors(), hbSchema));
-        this.ccs = nodeParameters.getClusterController();
+        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
+                connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
+                osMXBean.getAvailableProcessors(), hbSchema));
         queue.start();
 
-        heartbeatTask = new HeartbeatTask(cc);
+        heartbeatTask = new HeartbeatTask(ccs);
 
         // Schedule heartbeat generator.
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
         if (nodeParameters.getProfileDumpPeriod() > 0) {
             // Schedule profile dump generator.
-            timer.schedule(new ProfileDumpTask(cc), 0, nodeParameters.getProfileDumpPeriod());
+            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
         }
 
         LOGGER.log(Level.INFO, "Started NodeControllerService");
@@ -197,7 +202,6 @@
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
 
-    @Override
     public String getId() {
         return id;
     }
@@ -237,7 +241,7 @@
     @Override
     public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
         StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
         queue.schedule(stw);
     }
@@ -248,7 +252,6 @@
         queue.schedule(cjw);
     }
 
-    @Override
     public NCConfig getConfiguration() throws Exception {
         return ncConfig;
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index b9eaf1f..6e38ef7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -63,7 +63,8 @@
         this.ctx = ctx;
         serverChannel = ServerSocketChannel.open();
         ServerSocket serverSocket = serverChannel.socket();
-        serverSocket.bind(new InetSocketAddress(inetAddress, 0));
+        serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
+        serverSocket.setReuseAddress(true);
         stopped = false;
         connectionListener = new ConnectionListenerThread();
         dataListener = new DataListenerThread();
@@ -97,6 +98,7 @@
         public ConnectionListenerThread() {
             super("Hyracks NC Connection Listener");
             setDaemon(true);
+            setPriority(MAX_PRIORITY);
         }
 
         @Override
@@ -159,6 +161,7 @@
                         if (!pendingIncomingConnections.isEmpty()) {
                             for (SocketChannel sc : pendingIncomingConnections) {
                                 sc.configureBlocking(false);
+                                sc.socket().setReuseAddress(true);
                                 SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
                                 ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
                                 scKey.attach(buffer);
@@ -170,6 +173,7 @@
                             for (INetworkChannel nc : pendingOutgoingConnections) {
                                 SocketChannel sc = SocketChannel.open();
                                 sc.configureBlocking(false);
+                                sc.socket().setReuseAddress(true);
                                 SelectionKey scKey = sc.register(selector, 0);
                                 scKey.attach(nc);
                                 nc.setSelectionKey(scKey);
@@ -202,6 +206,7 @@
                                             } catch (HyracksException e) {
                                                 key.cancel();
                                                 sc.close();
+                                                channel.abort();
                                             }
                                         } else {
                                             buffer.compact();
@@ -210,15 +215,19 @@
                                 } else {
                                     INetworkChannel channel = (INetworkChannel) key.attachment();
                                     boolean close = false;
+                                    boolean error = false;
                                     try {
                                         close = channel.dispatchNetworkEvent();
                                     } catch (IOException e) {
                                         e.printStackTrace();
-                                        close = true;
+                                        error = true;
                                     }
-                                    if (close) {
+                                    if (close || error) {
                                         key.cancel();
                                         sc.close();
+                                        if (error) {
+                                            channel.abort();
+                                        }
                                     }
                                 }
                             }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 58f7088..23cf514 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -130,7 +130,7 @@
     public synchronized boolean dispatchNetworkEvent() throws IOException {
         if (aborted) {
             eos = true;
-            monitor.notifyEndOfStream(this);
+            monitor.notifyFailure(this);
             return true;
         }
         if (key.isConnectable()) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index 33819a8..2b37c7b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -172,6 +172,7 @@
                 PartitionId pid = (PartitionId) channel.getAttachment();
                 int senderIndex = pid.getSenderIndex();
                 failSenders.set(senderIndex);
+                eosSenders.set(senderIndex);
                 NonDeterministicPartitionCollector.this.notifyAll();
             }
         }
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 8255c58..626bb98 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -18,7 +18,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -86,7 +86,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index ae988cd..5a0c90d 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -17,7 +17,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -84,7 +84,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index dbc0bca..ec62b1f 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -19,7 +19,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -77,7 +77,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index bf2c2e1..8075f89 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -17,7 +17,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -79,7 +79,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 29f8257..573c078 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -19,7 +19,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -80,7 +80,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index abedf4a..fac141c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -28,7 +28,7 @@
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -61,7 +61,10 @@
     @BeforeClass
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.port = 39001;
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
         File outDir = new File("target/ClusterController");
         outDir.mkdirs();
@@ -75,6 +78,7 @@
         NCConfig ncConfig1 = new NCConfig();
         ncConfig1.ccHost = "localhost";
         ncConfig1.ccPort = 39001;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
@@ -83,12 +87,13 @@
         NCConfig ncConfig2 = new NCConfig();
         ncConfig2.ccHost = "localhost";
         ncConfig2.ccPort = 39001;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
-        hcc = new HyracksLocalConnection(cc);
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
         hcc.createApplication("test", null);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index a1a5179..f594d7e 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -19,7 +19,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -112,7 +112,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job;
 
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 17acc105..4f78928 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -20,7 +20,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -95,7 +95,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
                 options.algo, options.htSize, options.sbSize, options.format);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index cffa0f4..8e63e0a 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -20,7 +20,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -118,7 +118,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
                 parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index 9e76a3e..81e9f84 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -5,7 +5,7 @@
 import java.util.Properties;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -15,7 +15,7 @@
 
 public class HyracksClient {
 
-    private static HyracksRMIConnection connection;
+    private static HyracksConnection connection;
     private static final String jobProfilingKey = "jobProfilingKey";
     Set<String> systemLibs;
 
@@ -25,7 +25,7 @@
 
     private void initialize(Properties properties) throws Exception {
         String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
-        connection = new HyracksRMIConnection(clusterController, 1099);
+        connection = new HyracksConnection(clusterController, 1099);
         systemLibs = new HashSet<String>();
         for (String systemLib : ConfigurationConstants.systemLibs) {
             String systemLibPath = properties.getProperty(systemLib);
diff --git a/hyracks-ipc/pom.xml b/hyracks-ipc/pom.xml
new file mode 100644
index 0000000..49c4323
--- /dev/null
+++ b/hyracks-ipc/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-ipc</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>junit</groupId>
+  	<artifactId>junit</artifactId>
+  	<version>4.8.1</version>
+  	<scope>test</scope>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
new file mode 100644
index 0000000..a4adf23
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCHandle {
+    public InetSocketAddress getRemoteAddress();
+
+    public void send(Object request, IResponseCallback callback) throws IPCException;
+
+    public void setAttachment(Object attachment);
+
+    public Object getAttachment();
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
new file mode 100644
index 0000000..ba9f343
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IIPCI {
+    public Object call(IIPCHandle caller, Object req) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
new file mode 100644
index 0000000..7d25f88
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IResponseCallback {
+    public void callback(IIPCHandle handle, Object response, Exception exception);
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
new file mode 100644
index 0000000..180b1dd
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public final class SyncRMI implements IResponseCallback {
+    private boolean pending;
+
+    private Object response;
+
+    private Exception exception;
+
+    public SyncRMI() {
+    }
+
+    @Override
+    public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
+        pending = false;
+        this.response = response;
+        this.exception = exception;
+        notifyAll();
+    }
+
+    public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
+        pending = true;
+        response = null;
+        exception = null;
+        handle.send(request, this);
+        while (pending) {
+            wait();
+        }
+        if (exception != null) {
+            throw exception;
+        }
+        return response;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
new file mode 100644
index 0000000..9ecf015
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.exceptions;
+
+public class IPCException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public IPCException() {
+        super();
+    }
+
+    public IPCException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IPCException(String message) {
+        super(message);
+    }
+
+    public IPCException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
new file mode 100644
index 0000000..47c3d4a
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+enum HandleState {
+    INITIAL,
+    CONNECT_SENT,
+    CONNECT_RECEIVED,
+    CONNECTED,
+    CLOSED,
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
new file mode 100644
index 0000000..cbbe718
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class IPCConnectionManager {
+    private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+
+    private final IPCSystem system;
+
+    private final NetworkThread networkThread;
+
+    private final ServerSocketChannel serverSocketChannel;
+
+    private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
+
+    private final List<IPCHandle>[] pendingConnections;
+
+    private final List<Message>[] sendList;
+
+    private int writerIndex;
+
+    private int readerIndex;
+
+    private final InetSocketAddress address;
+
+    private volatile boolean stopped;
+
+    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+        this.system = system;
+        this.networkThread = new NetworkThread();
+        this.serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket socket = serverSocketChannel.socket();
+        socket.bind(socketAddress);
+        address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+        ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
+        pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
+        sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
+        writerIndex = 0;
+        readerIndex = 1;
+    }
+
+    InetSocketAddress getAddress() {
+        return address;
+    }
+
+    void start() {
+        stopped = false;
+        networkThread.start();
+    }
+
+    void stop() throws IOException {
+        stopped = true;
+        serverSocketChannel.close();
+    }
+
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+        IPCHandle handle;
+        synchronized (this) {
+            handle = ipcHandleMap.get(remoteAddress);
+            if (handle == null) {
+                handle = new IPCHandle(system, remoteAddress);
+                pendingConnections[writerIndex].add(handle);
+                networkThread.selector.wakeup();
+            }
+        }
+        handle.waitTillConnected();
+        return handle;
+    }
+
+    synchronized void registerHandle(IPCHandle handle) {
+        ipcHandleMap.put(handle.getRemoteAddress(), handle);
+    }
+
+    synchronized void write(Message msg) {
+        sendList[writerIndex].add(msg);
+        networkThread.selector.wakeup();
+    }
+
+    private synchronized void swapReadersAndWriters() {
+        int temp = readerIndex;
+        readerIndex = writerIndex;
+        writerIndex = temp;
+    }
+
+    private Message createInitialReqMessage(IPCHandle handle) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(-1);
+        msg.setFlag(Message.INITIAL_REQ);
+        msg.setPayload(address);
+        return msg;
+    }
+
+    private Message createInitialAckMessage(IPCHandle handle, Message req) {
+        Message msg = new Message(handle);
+        msg.setMessageId(system.createMessageId());
+        msg.setRequestMessageId(req.getMessageId());
+        msg.setFlag(Message.INITIAL_ACK);
+        msg.setPayload(null);
+        return msg;
+    }
+
+    void ack(IPCHandle handle, Message req) {
+        write(createInitialAckMessage(handle, req));
+    }
+
+    private class NetworkThread extends Thread {
+        private final Selector selector;
+
+        public NetworkThread() {
+            super("IPC Network Listener Thread");
+            setDaemon(true);
+            try {
+                selector = Selector.open();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+            } catch (ClosedChannelException e) {
+                throw new RuntimeException(e);
+            }
+            while (!stopped) {
+                try {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting Select");
+                    }
+                    int n = selector.select();
+                    swapReadersAndWriters();
+                    if (!pendingConnections[readerIndex].isEmpty()) {
+                        for (IPCHandle handle : pendingConnections[readerIndex]) {
+                            SocketChannel channel = SocketChannel.open();
+                            channel.configureBlocking(false);
+                            SelectionKey cKey = null;
+                            if (channel.connect(handle.getRemoteAddress())) {
+                                cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setState(HandleState.CONNECT_SENT);
+                                write(createInitialReqMessage(handle));
+                            } else {
+                                cKey = channel.register(selector, SelectionKey.OP_CONNECT);
+                            }
+                            handle.setKey(cKey);
+                            cKey.attach(handle);
+                        }
+                        pendingConnections[readerIndex].clear();
+                    }
+                    if (!sendList[readerIndex].isEmpty()) {
+                        for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+                            Message msg = i.next();
+                            IPCHandle handle = msg.getIPCHandle();
+                            if (handle.getState() == HandleState.CLOSED) {
+                                i.remove();
+                            } else if (!handle.full()) {
+                                while (true) {
+                                    ByteBuffer buffer = handle.getOutBuffer();
+                                    buffer.compact();
+                                    boolean success = msg.write(buffer);
+                                    buffer.flip();
+                                    if (success) {
+                                        i.remove();
+                                        SelectionKey key = handle.getKey();
+                                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                    } else {
+                                        if (buffer.position() == 0) {
+                                            handle.resizeOutBuffer();
+                                            continue;
+                                        }
+                                        handle.markFull();
+                                    }
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    if (n > 0) {
+                        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+                            SelectionKey key = i.next();
+                            i.remove();
+                            SelectableChannel sc = key.channel();
+                            if (key.isReadable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer readBuffer = handle.getInBuffer();
+                                int len = channel.read(readBuffer);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else {
+                                    handle.processIncomingMessages();
+                                    if (!readBuffer.hasRemaining()) {
+                                        handle.resizeInBuffer();
+                                    }
+                                }
+                            } else if (key.isWritable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                ByteBuffer writeBuffer = handle.getOutBuffer();
+                                int len = channel.write(writeBuffer);
+                                if (len < 0) {
+                                    key.cancel();
+                                    channel.close();
+                                    handle.close();
+                                } else if (!writeBuffer.hasRemaining()) {
+                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                                }
+                                handle.clearFull();
+                            } else if (key.isAcceptable()) {
+                                assert sc == serverSocketChannel;
+                                SocketChannel channel = serverSocketChannel.accept();
+                                channel.configureBlocking(false);
+                                IPCHandle handle = new IPCHandle(system, null);
+                                SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
+                                handle.setKey(cKey);
+                                cKey.attach(handle);
+                                handle.setState(HandleState.CONNECT_RECEIVED);
+                            } else if (key.isConnectable()) {
+                                SocketChannel channel = (SocketChannel) sc;
+                                if (channel.finishConnect()) {
+                                    IPCHandle handle = (IPCHandle) key.attachment();
+                                    handle.setState(HandleState.CONNECT_SENT);
+                                    registerHandle(handle);
+                                    key.interestOps(SelectionKey.OP_READ);
+                                    write(createInitialReqMessage(handle));
+                                }
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
new file mode 100644
index 0000000..481a0b0
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+final class IPCHandle implements IIPCHandle {
+    private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+
+    private final IPCSystem system;
+
+    private InetSocketAddress remoteAddress;
+
+    private final Map<Long, IResponseCallback> pendingRequestMap;
+
+    private HandleState state;
+
+    private SelectionKey key;
+
+    private Object attachment;
+
+    private ByteBuffer inBuffer;
+
+    private ByteBuffer outBuffer;
+
+    private boolean full;
+
+    IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
+        this.system = system;
+        this.remoteAddress = remoteAddress;
+        pendingRequestMap = new HashMap<Long, IResponseCallback>();
+        inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        outBuffer.flip();
+        state = HandleState.INITIAL;
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    void setRemoteAddress(InetSocketAddress remoteAddress) {
+        this.remoteAddress = remoteAddress;
+    }
+
+    @Override
+    public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+        if (state != HandleState.CONNECTED) {
+            throw new IPCException("Handle is not in Connected state");
+        }
+        Message msg = new Message(this);
+        long mid = system.createMessageId();
+        msg.setMessageId(mid);
+        msg.setRequestMessageId(-1);
+        msg.setPayload(req);
+        if (callback != null) {
+            pendingRequestMap.put(mid, callback);
+        }
+        system.getConnectionManager().write(msg);
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    SelectionKey getKey() {
+        return key;
+    }
+
+    void setKey(SelectionKey key) {
+        this.key = key;
+    }
+
+    public synchronized boolean isConnected() {
+        return state == HandleState.CONNECTED;
+    }
+
+    synchronized HandleState getState() {
+        return state;
+    }
+
+    synchronized void setState(HandleState state) {
+        this.state = state;
+        notifyAll();
+    }
+    
+    synchronized void waitTillConnected() throws InterruptedException {
+        while (!isConnected()) {
+            wait();
+        }
+    }
+
+    ByteBuffer getInBuffer() {
+        return inBuffer;
+    }
+
+    ByteBuffer getOutBuffer() {
+        return outBuffer;
+    }
+
+    synchronized void close() {
+        setState(HandleState.CLOSED);
+        for (IResponseCallback cb : pendingRequestMap.values()) {
+            cb.callback(this, null, new IPCException("IPC Handle Closed"));
+        }
+    }
+
+    synchronized void processIncomingMessages() {
+        inBuffer.flip();
+        while (Message.hasMessage(inBuffer)) {
+            Message message = new Message(this);
+            try {
+                message.read(inBuffer);
+            } catch (Exception e) {
+                message.setFlag(Message.ERROR);
+                message.setPayload(e);
+            }
+
+            if (state == HandleState.CONNECT_RECEIVED) {
+                remoteAddress = (InetSocketAddress) message.getPayload();
+                system.getConnectionManager().registerHandle(this);
+                setState(HandleState.CONNECTED);
+                system.getConnectionManager().ack(this, message);
+                continue;
+            } else if (state == HandleState.CONNECT_SENT) {
+                if (message.getFlag() == Message.INITIAL_ACK) {
+                    setState(HandleState.CONNECTED);
+                } else {
+                    throw new IllegalStateException();
+                }
+                continue;
+            }
+            long requestMessageId = message.getRequestMessageId();
+            if (requestMessageId < 0) {
+                system.deliverIncomingMessage(message);
+            } else {
+                Long rid = Long.valueOf(requestMessageId);
+                IResponseCallback cb = pendingRequestMap.remove(rid);
+                if (cb != null) {
+                    byte flag = message.getFlag();
+                    Object payload = flag == Message.ERROR ? null : message.getPayload();
+                    Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
+                    cb.callback(this, payload, exception);
+                }
+            }
+        }
+        inBuffer.compact();
+    }
+
+    void resizeInBuffer() {
+        inBuffer.flip();
+        ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
+        readBuffer.put(inBuffer);
+        readBuffer.compact();
+        inBuffer = readBuffer;
+    }
+
+    void resizeOutBuffer() {
+        ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
+        writeBuffer.put(outBuffer);
+        writeBuffer.compact();
+        writeBuffer.flip();
+        outBuffer = writeBuffer;
+    }
+
+    void markFull() {
+        full = true;
+    }
+
+    void clearFull() {
+        full = false;
+    }
+
+    boolean full() {
+        return full;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
new file mode 100644
index 0000000..9eef8ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCSystem {
+    private final IPCConnectionManager cMgr;
+
+    private final IIPCI ipci;
+
+    private final Executor executor;
+
+    private final AtomicLong midFactory;
+
+    public IPCSystem(InetSocketAddress socketAddress) throws IOException {
+        this(socketAddress, null, null);
+    }
+
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+        cMgr = new IPCConnectionManager(this, socketAddress);
+        this.ipci = ipci;
+        this.executor = executor;
+        midFactory = new AtomicLong();
+    }
+
+    public InetSocketAddress getSocketAddress() {
+        return cMgr.getAddress();
+    }
+
+    public void start() {
+        cMgr.start();
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        try {
+            return cMgr.getIPCHandle(remoteAddress);
+        } catch (IOException e) {
+            throw new IPCException(e);
+        } catch (InterruptedException e) {
+            throw new IPCException(e);
+        }
+    }
+
+    long createMessageId() {
+        return midFactory.incrementAndGet();
+    }
+
+    void deliverIncomingMessage(final Message message) {
+        assert message.getFlag() == Message.NORMAL;
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                IPCHandle handle = message.getIPCHandle();
+                Message response = new Message(handle);
+                response.setMessageId(createMessageId());
+                response.setRequestMessageId(message.getMessageId());
+                response.setFlag(Message.NORMAL);
+                try {
+                    Object result = ipci.call(handle, message.getPayload());
+                    response.setPayload(result);
+                } catch (Exception e) {
+                    response.setFlag(Message.ERROR);
+                    response.setPayload(e);
+                }
+                cMgr.write(response);
+            }
+        });
+    }
+
+    IPCConnectionManager getConnectionManager() {
+        return cMgr;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
new file mode 100644
index 0000000..ab3428e
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+
+class Message {
+    private static final int MSG_SIZE_SIZE = 4;
+
+    private static final int HEADER_SIZE = 17;
+
+    static final byte INITIAL_REQ = 1;
+
+    static final byte INITIAL_ACK = 2;
+
+    static final byte ERROR = 3;
+
+    static final byte NORMAL = 0;
+
+    private IPCHandle ipcHandle;
+
+    private long messageId;
+
+    private long requestMessageId;
+
+    private byte flag;
+
+    private Object payload;
+
+    Message(IPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    IPCHandle getIPCHandle() {
+        return ipcHandle;
+    }
+
+    void setMessageId(long messageId) {
+        this.messageId = messageId;
+    }
+
+    long getMessageId() {
+        return messageId;
+    }
+
+    void setRequestMessageId(long requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    long getRequestMessageId() {
+        return requestMessageId;
+    }
+
+    void setFlag(byte flag) {
+        this.flag = flag;
+    }
+
+    byte getFlag() {
+        return flag;
+    }
+
+    void setPayload(Object payload) {
+        this.payload = payload;
+    }
+
+    Object getPayload() {
+        return payload;
+    }
+
+    static boolean hasMessage(ByteBuffer buffer) {
+        if (buffer.remaining() < MSG_SIZE_SIZE) {
+            return false;
+        }
+        int msgSize = buffer.getInt(buffer.position());
+        return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+    }
+
+    void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+        assert hasMessage(buffer);
+        int msgSize = buffer.getInt();
+        messageId = buffer.getLong();
+        requestMessageId = buffer.getLong();
+        flag = buffer.get();
+        int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+        try {
+            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+                    msgSize - HEADER_SIZE));
+            payload = ois.readObject();
+            ois.close();
+        } finally {
+            buffer.position(finalPosition);
+        }
+    }
+
+    boolean write(ByteBuffer buffer) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(payload);
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+        if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
+            buffer.putInt(HEADER_SIZE + bytes.length);
+            buffer.putLong(messageId);
+            buffer.putLong(requestMessageId);
+            buffer.put(flag);
+            buffer.put(bytes);
+            return true;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
new file mode 100644
index 0000000..1fc1f6f
--- /dev/null
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.ipc.tests;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+public class IPCTest {
+    @Test
+    public void test() throws Exception {
+        IPCSystem server = createServerIPCSystem();
+        server.start();
+        InetSocketAddress serverAddr = server.getSocketAddress();
+
+        IPCSystem client = createClientIPCSystem();
+        client.start();
+
+        IIPCHandle handle = client.getHandle(serverAddr);
+
+        SyncRMI rmi = new SyncRMI();
+        for (int i = 0; i < 100; ++i) {
+            Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+        }
+
+        IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
+
+        try {
+            rmi.call(rHandle, "Foo");
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    private IPCSystem createServerIPCSystem() throws IOException {
+        Executor executor = Executors.newCachedThreadPool();
+        IIPCI ipci = new IIPCI() {
+            @Override
+            public Object call(IIPCHandle caller, Object req) throws Exception {
+                Integer i = (Integer) req;
+                return i.intValue() * 2;
+            }
+        };
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+    }
+
+    private IPCSystem createClientIPCSystem() throws IOException {
+        Executor executor = Executors.newCachedThreadPool();
+        IIPCI ipci = new IIPCI() {
+            @Override
+            public Object call(IIPCHandle caller, Object req) throws Exception {
+                throw new IllegalStateException();
+            }
+        };
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java b/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
index 8ed7bb7..4faa619 100644
--- a/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
+++ b/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
@@ -27,8 +27,11 @@
         @Option(name = "-n", required = false, usage = "Number of node controllers (default: 2)")
         public int n = 2;
 
-        @Option(name = "-cc-port", required = false, usage = "CC Port (default: 1099)")
-        public int ccPort = 1099;
+        @Option(name = "-cc-client-net-port", required = false, usage = "CC Port (default: 1098)")
+        public int ccClientNetPort = 1098;
+
+        @Option(name = "-cc-cluster-net-port", required = false, usage = "CC Port (default: 1099)")
+        public int ccClusterNetPort = 1099;
 
         @Option(name = "-cc-http-port", required = false, usage = "CC Port (default: 19001)")
         public int ccHttpPort = 19001;
@@ -46,7 +49,10 @@
         }
 
         CCConfig ccConfig = new CCConfig();
-        ccConfig.port = options.ccPort;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = options.ccClusterNetPort;
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = options.ccClientNetPort;
         ccConfig.httpPort = options.ccHttpPort;
         HyracksCCProcess ccp = new HyracksCCProcess(ccConfig);
         ccp.start();
@@ -56,7 +62,9 @@
         HyracksNCProcess ncps[] = new HyracksNCProcess[options.n];
         for (int i = 0; i < options.n; ++i) {
             NCConfig ncConfig = new NCConfig();
-            ncConfig.ccHost = "localhost";
+            ncConfig.ccHost = "127.0.0.1";
+            ncConfig.ccPort = options.ccClusterNetPort;
+            ncConfig.clusterNetIPAddress = "127.0.0.1";
             ncConfig.nodeId = "nc" + i;
             ncConfig.dataIPAddress = "127.0.0.1";
             ncps[i] = new HyracksNCProcess(ncConfig);
diff --git a/pom.xml b/pom.xml
index 18c5cb0..890f42d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
   </pluginRepositories>
 
   <modules>
+    <module>hyracks-ipc</module>
     <module>hyracks-api</module>
     <module>hyracks-dataflow-common</module>
     <module>hyracks-dataflow-std</module>