Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+ public enum FunctionId {
+ GET_CLUSTER_CONTROLLER_INFO,
+ CREATE_APPLICATION,
+ START_APPLICATION,
+ DESTROY_APPLICATION,
+ CREATE_JOB,
+ GET_JOB_STATUS,
+ START_JOB,
+ WAIT_FOR_COMPLETION,
+ GET_NODE_CONTROLLERS_INFO
+ }
+
+ public abstract static class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class GetClusterControllerInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public CreateApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class StartApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public StartApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class CreateJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final byte[] jobSpec;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+ this.appName = appName;
+ this.jobSpec = jobSpec;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_JOB;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public byte[] getJobSpec() {
+ return jobSpec;
+ }
+
+ public EnumSet<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
+ }
+
+ public static class GetJobStatusFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobStatusFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_STATUS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class StartJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public StartJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class WaitForCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public WaitForCompletionFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.WAIT_FOR_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class GetNodeControllersInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+ private final IIPCHandle ipcHandle;
+
+ public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+ return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+ appName);
+ sync.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+ appName);
+ sync.call(ipcHandle, saf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+ appName);
+ sync.call(ipcHandle, daf);
+ }
+
+ @Override
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+ appName, jobSpec, jobFlags);
+ return (JobId) sync.call(ipcHandle, cjf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+ jobId);
+ return (JobStatus) sync.call(ipcHandle, gjsf);
+ }
+
+ @Override
+ public void startJob(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+ jobId);
+ sync.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+ jobId);
+ sync.call(ipcHandle, wfcf);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+ return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
similarity index 75%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 90bbfb1..1b38cb9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.client;
import java.io.File;
+import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.Map;
@@ -30,17 +31,42 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-abstract class AbstractHyracksConnection implements IHyracksClientConnection {
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ *
+ * @author vinayakb
+ *
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
private final String ccHost;
+ private final IPCSystem ipc;
+
private final IHyracksClientInterface hci;
private final ClusterControllerInfo ccInfo;
- public AbstractHyracksConnection(String ccHost, IHyracksClientInterface hci) throws Exception {
+ /**
+ * Constructor to create a connection to the Hyracks Cluster Controller.
+ *
+ * @param ccHost
+ * Host name (or IP Address) where the Cluster Controller can be
+ * reached.
+ * @param ccPort
+ * Port to reach the Hyracks Cluster Controller at the specified
+ * host name.
+ * @throws Exception
+ */
+ public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
- this.hci = hci;
+ ipc = new IPCSystem(new InetSocketAddress(0));
+ ipc.start();
+ IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+ this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
ccInfo = hci.getClusterControllerInfo();
}
@@ -82,7 +108,7 @@
@Override
public void start(JobId jobId) throws Exception {
- hci.start(jobId);
+ hci.startJob(jobId);
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
deleted file mode 100644
index ee0b276..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-/**
- * Connection Class used by a Hyracks Client that is colocated in the same VM
- * with the Cluster Controller. Usually, clients must not use this class. This
- * is used internally for testing purposes.
- *
- * @author vinayakb
- *
- */
-public final class HyracksLocalConnection extends AbstractHyracksConnection {
- public HyracksLocalConnection(IHyracksClientInterface hci) throws Exception {
- super("localhost", hci);
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
deleted file mode 100644
index 65d82a8..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-/**
- * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
- * Controller using RMI. Usually, such a connection would be used when the CC
- * runs in a separate JVM from the client (The most common case).
- *
- * @author vinayakb
- *
- */
-public final class HyracksRMIConnection extends AbstractHyracksConnection {
- /**
- * Constructor to create a connection to the Hyracks Cluster Controller.
- *
- * @param host
- * Host name (or IP Address) where the Cluster Controller can be
- * reached.
- * @param port
- * Port to reach the Hyracks Cluster Controller at the specified
- * host name.
- * @throws Exception
- */
- public HyracksRMIConnection(String host, int port) throws Exception {
- super(host, lookupHCI(host, port));
- }
-
- private static IHyracksClientInterface lookupHCI(String host, int port) throws RemoteException, NotBoundException {
- Registry registry = LocateRegistry.getRegistry(host, port);
- return (IHyracksClientInterface) registry.lookup(IHyracksClientInterface.class.getName());
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.api.client;
-import java.rmi.Remote;
import java.util.EnumSet;
import java.util.Map;
@@ -22,7 +21,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
public ClusterControllerInfo getClusterControllerInfo() throws Exception;
public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
- public void start(JobId jobId) throws Exception;
+ public void startJob(JobId jobId) throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;