Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index f73c27a..ad101d5 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -1,9 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-api</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
@@ -45,5 +42,10 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-ipc</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+    public enum FunctionId {
+        GET_CLUSTER_CONTROLLER_INFO,
+        CREATE_APPLICATION,
+        START_APPLICATION,
+        DESTROY_APPLICATION,
+        CREATE_JOB,
+        GET_JOB_STATUS,
+        START_JOB,
+        WAIT_FOR_COMPLETION,
+        GET_NODE_CONTROLLERS_INFO
+    }
+
+    public abstract static class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class GetClusterControllerInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public CreateApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class StartApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public StartApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class CreateJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final byte[] jobSpec;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+            this.appName = appName;
+            this.jobSpec = jobSpec;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_JOB;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public byte[] getJobSpec() {
+            return jobSpec;
+        }
+
+        public EnumSet<JobFlag> getJobFlags() {
+            return jobFlags;
+        }
+    }
+
+    public static class GetJobStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobStatusFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public StartJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class WaitForCompletionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public WaitForCompletionFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.WAIT_FOR_COMPLETION;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+    private final IIPCHandle ipcHandle;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+                appName);
+        sync.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+                appName);
+        sync.call(ipcHandle, saf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+                appName);
+        sync.call(ipcHandle, daf);
+    }
+
+    @Override
+    public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+                appName, jobSpec, jobFlags);
+        return (JobId) sync.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+                jobId);
+        return (JobStatus) sync.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public void startJob(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+                jobId);
+        sync.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+                jobId);
+        sync.call(ipcHandle, wfcf);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
similarity index 75%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 90bbfb1..1b38cb9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.client;
 
 import java.io.File;
+import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.Map;
 
@@ -30,17 +31,42 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
-abstract class AbstractHyracksConnection implements IHyracksClientConnection {
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ * 
+ * @author vinayakb
+ * 
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
 
+    private final IPCSystem ipc;
+
     private final IHyracksClientInterface hci;
 
     private final ClusterControllerInfo ccInfo;
 
-    public AbstractHyracksConnection(String ccHost, IHyracksClientInterface hci) throws Exception {
+    /**
+     * Constructor to create a connection to the Hyracks Cluster Controller.
+     * 
+     * @param ccHost
+     *            Host name (or IP Address) where the Cluster Controller can be
+     *            reached.
+     * @param ccPort
+     *            Port to reach the Hyracks Cluster Controller at the specified
+     *            host name.
+     * @throws Exception
+     */
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
-        this.hci = hci;
+        ipc = new IPCSystem(new InetSocketAddress(0));
+        ipc.start();
+        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
         ccInfo = hci.getClusterControllerInfo();
     }
 
@@ -82,7 +108,7 @@
 
     @Override
     public void start(JobId jobId) throws Exception {
-        hci.start(jobId);
+        hci.startJob(jobId);
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
deleted file mode 100644
index ee0b276..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-/**
- * Connection Class used by a Hyracks Client that is colocated in the same VM
- * with the Cluster Controller. Usually, clients must not use this class. This
- * is used internally for testing purposes.
- * 
- * @author vinayakb
- * 
- */
-public final class HyracksLocalConnection extends AbstractHyracksConnection {
-    public HyracksLocalConnection(IHyracksClientInterface hci) throws Exception {
-        super("localhost", hci);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
deleted file mode 100644
index 65d82a8..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-/**
- * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
- * Controller using RMI. Usually, such a connection would be used when the CC
- * runs in a separate JVM from the client (The most common case).
- * 
- * @author vinayakb
- * 
- */
-public final class HyracksRMIConnection extends AbstractHyracksConnection {
-    /**
-     * Constructor to create a connection to the Hyracks Cluster Controller.
-     * 
-     * @param host
-     *            Host name (or IP Address) where the Cluster Controller can be
-     *            reached.
-     * @param port
-     *            Port to reach the Hyracks Cluster Controller at the specified
-     *            host name.
-     * @throws Exception
-     */
-    public HyracksRMIConnection(String host, int port) throws Exception {
-        super(host, lookupHCI(host, port));
-    }
-
-    private static IHyracksClientInterface lookupHCI(String host, int port) throws RemoteException, NotBoundException {
-        Registry registry = LocateRegistry.getRegistry(host, port);
-        return (IHyracksClientInterface) registry.lookup(IHyracksClientInterface.class.getName());
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.client;
 
-import java.rmi.Remote;
 import java.util.EnumSet;
 import java.util.Map;
 
@@ -22,7 +21,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
     public ClusterControllerInfo getClusterControllerInfo() throws Exception;
 
     public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
 
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
-    public void start(JobId jobId) throws Exception;
+    public void startJob(JobId jobId) throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;