Added new IPC mechanism to Hyracks. Migrated all remote communications to use new IPC layer
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@935 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index d6c2165..06b76ef 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -17,7 +17,7 @@
import java.util.EnumSet;
import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -32,9 +32,8 @@
public static final String NC1_ID = "nc1";
public static final String NC2_ID = "nc2";
- public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
-
- public static final int TEST_HYRACKS_CC_PORT = 4322;
+ public static final int TEST_HYRACKS_CC_CLUSTER_NET_PORT = 4322;
+ public static final int TEST_HYRACKS_CC_CLIENT_NET_PORT = 4321;
private static ClusterControllerService cc;
private static NodeControllerService nc1;
@@ -43,14 +42,18 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.port = TEST_HYRACKS_CC_PORT;
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_NET_PORT;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
// ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
@@ -58,13 +61,14 @@
NCConfig ncConfig2 = new NCConfig();
ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
- hcc = new HyracksLocalConnection(cc);
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
}
@@ -77,9 +81,9 @@
public static void runJob(JobSpecification spec) throws Exception {
JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
- cc.start(jobId);
+ cc.startJob(jobId);
AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
cc.waitForCompletion(jobId);
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index f73c27a..ad101d5 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -1,9 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
@@ -45,5 +42,10 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+ public enum FunctionId {
+ GET_CLUSTER_CONTROLLER_INFO,
+ CREATE_APPLICATION,
+ START_APPLICATION,
+ DESTROY_APPLICATION,
+ CREATE_JOB,
+ GET_JOB_STATUS,
+ START_JOB,
+ WAIT_FOR_COMPLETION,
+ GET_NODE_CONTROLLERS_INFO
+ }
+
+ public abstract static class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class GetClusterControllerInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public CreateApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class StartApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public StartApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class CreateJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final byte[] jobSpec;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+ this.appName = appName;
+ this.jobSpec = jobSpec;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_JOB;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public byte[] getJobSpec() {
+ return jobSpec;
+ }
+
+ public EnumSet<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
+ }
+
+ public static class GetJobStatusFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobStatusFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_STATUS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class StartJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public StartJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class WaitForCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public WaitForCompletionFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.WAIT_FOR_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class GetNodeControllersInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+ private final IIPCHandle ipcHandle;
+
+ public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+ return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+ appName);
+ sync.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+ appName);
+ sync.call(ipcHandle, saf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+ appName);
+ sync.call(ipcHandle, daf);
+ }
+
+ @Override
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+ appName, jobSpec, jobFlags);
+ return (JobId) sync.call(ipcHandle, cjf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+ jobId);
+ return (JobStatus) sync.call(ipcHandle, gjsf);
+ }
+
+ @Override
+ public void startJob(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+ jobId);
+ sync.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+ jobId);
+ sync.call(ipcHandle, wfcf);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+ return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
similarity index 75%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 90bbfb1..1b38cb9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.client;
import java.io.File;
+import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.Map;
@@ -30,17 +31,42 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-abstract class AbstractHyracksConnection implements IHyracksClientConnection {
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ *
+ * @author vinayakb
+ *
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
private final String ccHost;
+ private final IPCSystem ipc;
+
private final IHyracksClientInterface hci;
private final ClusterControllerInfo ccInfo;
- public AbstractHyracksConnection(String ccHost, IHyracksClientInterface hci) throws Exception {
+ /**
+ * Constructor to create a connection to the Hyracks Cluster Controller.
+ *
+ * @param ccHost
+ * Host name (or IP Address) where the Cluster Controller can be
+ * reached.
+ * @param ccPort
+ * Port to reach the Hyracks Cluster Controller at the specified
+ * host name.
+ * @throws Exception
+ */
+ public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
- this.hci = hci;
+ ipc = new IPCSystem(new InetSocketAddress(0));
+ ipc.start();
+ IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+ this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
ccInfo = hci.getClusterControllerInfo();
}
@@ -82,7 +108,7 @@
@Override
public void start(JobId jobId) throws Exception {
- hci.start(jobId);
+ hci.startJob(jobId);
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
deleted file mode 100644
index ee0b276..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksLocalConnection.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-/**
- * Connection Class used by a Hyracks Client that is colocated in the same VM
- * with the Cluster Controller. Usually, clients must not use this class. This
- * is used internally for testing purposes.
- *
- * @author vinayakb
- *
- */
-public final class HyracksLocalConnection extends AbstractHyracksConnection {
- public HyracksLocalConnection(IHyracksClientInterface hci) throws Exception {
- super("localhost", hci);
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
deleted file mode 100644
index 65d82a8..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksRMIConnection.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.client;
-
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-/**
- * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
- * Controller using RMI. Usually, such a connection would be used when the CC
- * runs in a separate JVM from the client (The most common case).
- *
- * @author vinayakb
- *
- */
-public final class HyracksRMIConnection extends AbstractHyracksConnection {
- /**
- * Constructor to create a connection to the Hyracks Cluster Controller.
- *
- * @param host
- * Host name (or IP Address) where the Cluster Controller can be
- * reached.
- * @param port
- * Port to reach the Hyracks Cluster Controller at the specified
- * host name.
- * @throws Exception
- */
- public HyracksRMIConnection(String host, int port) throws Exception {
- super(host, lookupHCI(host, port));
- }
-
- private static IHyracksClientInterface lookupHCI(String host, int port) throws RemoteException, NotBoundException {
- Registry registry = LocateRegistry.getRegistry(host, port);
- return (IHyracksClientInterface) registry.lookup(IHyracksClientInterface.class.getName());
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.api.client;
-import java.rmi.Remote;
import java.util.EnumSet;
import java.util.Map;
@@ -22,7 +21,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
public ClusterControllerInfo getClusterControllerInfo() throws Exception;
public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
- public void start(JobId jobId) throws Exception;
+ public void startJob(JobId jobId) throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
diff --git a/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java b/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
index e1a1fd2..09dbc8c 100644
--- a/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
+++ b/hyracks-cli/src/main/java/edu/uci/ics/hyracks/cli/commands/ConnectCommand.java
@@ -1,6 +1,6 @@
package edu.uci.ics.hyracks.cli.commands;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.cli.Session;
@@ -22,7 +22,7 @@
@Override
public void run(Session session) throws Exception {
System.err.println("Connecting to host: " + host + ", port: " + port);
- IHyracksClientConnection conn = new HyracksRMIConnection(host, port);
+ IHyracksClientConnection conn = new HyracksConnection(host, port);
session.setConnection(conn);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
deleted file mode 100644
index 70e85a1..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package edu.uci.ics.hyracks.control.cc;
-
-import java.rmi.RemoteException;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.EnumSet;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-
-public class CCClientInterface extends UnicastRemoteObject implements IHyracksClientInterface {
- private static final long serialVersionUID = 1L;
-
- private final ClusterControllerService ccs;
-
- public CCClientInterface(ClusterControllerService ccs) throws RemoteException {
- this.ccs = ccs;
- }
-
- @Override
- public ClusterControllerInfo getClusterControllerInfo() throws Exception {
- return ccs.getClusterControllerInfo();
- }
-
- @Override
- public void createApplication(String appName) throws Exception {
- ccs.createApplication(appName);
- }
-
- @Override
- public void startApplication(String appName) throws Exception {
- ccs.startApplication(appName);
- }
-
- @Override
- public void destroyApplication(String appName) throws Exception {
- ccs.destroyApplication(appName);
- }
-
- @Override
- public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- return ccs.createJob(appName, jobSpec, jobFlags);
- }
-
- @Override
- public JobStatus getJobStatus(JobId jobId) throws Exception {
- return ccs.getJobStatus(jobId);
- }
-
- @Override
- public void start(JobId jobId) throws Exception {
- ccs.start(jobId);
- }
-
- @Override
- public void waitForCompletion(JobId jobId) throws Exception {
- ccs.waitForCompletion(jobId);
- }
-
- @Override
- public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
- return ccs.getNodeControllersInfo();
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 935ce88..4575adc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -15,8 +15,7 @@
package edu.uci.ics.hyracks.control.cc;
import java.io.File;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
+import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Hashtable;
@@ -40,6 +39,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.ipc.HyracksClientInterfaceDelegateIPCI;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
@@ -69,6 +69,8 @@
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerDelegateIPCI;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -76,14 +78,19 @@
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
IHyracksClientInterface {
- private static final long serialVersionUID = 1L;
+ private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
private final CCConfig ccConfig;
- private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+ private IPCSystem clusterIPC;
+
+ private IPCSystem clientIPC;
private final LogFile jobLog;
@@ -105,12 +112,10 @@
private final WorkQueue workQueue;
- private final Executor taskExecutor;
+ private final Executor executor;
private final Timer timer;
- private final CCClientInterface ccci;
-
private final ICCContext ccContext;
private final DeadNodeSweeper sweeper;
@@ -125,7 +130,12 @@
ipAddressNodeNameMap = new HashMap<String, Set<String>>();
applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
- taskExecutor = Executors.newCachedThreadPool();
+ executor = Executors.newCachedThreadPool();
+ IIPCI ccIPCI = new ClusterControllerDelegateIPCI(this);
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, executor);
+ IIPCI ciIPCI = new HyracksClientInterfaceDelegateIPCI(this);
+ clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+ executor);
webServer = new WebServer(this);
activeRunMap = new HashMap<JobId, JobRun>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -137,7 +147,6 @@
};
workQueue = new WorkQueue();
this.timer = new Timer(true);
- ccci = new CCClientInterface(this);
ccContext = new ICCContext() {
@Override
public Map<String, Set<String>> getIPAddressNodeMap() {
@@ -151,9 +160,8 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
- Registry registry = LocateRegistry.createRegistry(ccConfig.port);
- registry.rebind(IHyracksClientInterface.class.getName(), ccci);
- registry.rebind(IClusterController.class.getName(), this);
+ clusterIPC.start();
+ clientIPC.start();
webServer.setPort(ccConfig.httpPort);
webServer.start();
workQueue.start();
@@ -203,7 +211,7 @@
}
public Executor getExecutor() {
- return taskExecutor;
+ return executor;
}
public Map<String, NodeControllerState> getNodeMap() {
@@ -229,13 +237,16 @@
@Override
public NodeParameters registerNode(NodeRegistration reg) throws Exception {
- INodeController nodeController = reg.getNodeController();
+ InetSocketAddress ncAddress = reg.getNodeControllerAddress();
String id = reg.getNodeId();
+
+ IIPCHandle ncIPCHandle = clusterIPC.getHandle(reg.getNodeControllerAddress());
+ INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
+
NodeControllerState state = new NodeControllerState(nodeController, reg);
workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
- params.setClusterController(this);
params.setClusterControllerInfo(info);
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
@@ -243,9 +254,8 @@
}
@Override
- public void unregisterNode(INodeController nodeController) throws Exception {
- String id = nodeController.getId();
- workQueue.schedule(new UnregisterNodeWork(this, id));
+ public void unregisterNode(String nodeId) throws Exception {
+ workQueue.schedule(new UnregisterNodeWork(this, nodeId));
}
@Override
@@ -275,7 +285,7 @@
}
@Override
- public void start(JobId jobId) throws Exception {
+ public void startJob(JobId jobId) throws Exception {
JobStartWork jse = new JobStartWork(this, jobId);
workQueue.schedule(jse);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
new file mode 100644
index 0000000..b3aa406
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.ipc;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
+ private final IHyracksClientInterface hci;
+
+ public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
+ this.hci = hci;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
+ switch (fn.getFunctionId()) {
+ case GET_CLUSTER_CONTROLLER_INFO: {
+ return hci.getClusterControllerInfo();
+ }
+
+ case CREATE_APPLICATION: {
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+ hci.createApplication(caf.getAppName());
+ return null;
+ }
+
+ case START_APPLICATION: {
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+ hci.startApplication(saf.getAppName());
+ return null;
+ }
+
+ case DESTROY_APPLICATION: {
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+ hci.destroyApplication(daf.getAppName());
+ return null;
+ }
+
+ case CREATE_JOB: {
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+ return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
+ }
+
+ case GET_JOB_STATUS: {
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ return hci.getJobStatus(gjsf.getJobId());
+ }
+
+ case START_JOB: {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ hci.startJob(sjf.getJobId());
+ return null;
+ }
+
+ case WAIT_FOR_COMPLETION: {
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ hci.waitForCompletion(wfcf.getJobId());
+ return null;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ return hci.getNodeControllersInfo();
+ }
+ }
+ throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 24bee8c..e2c9ca3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -448,7 +448,7 @@
public void run() {
try {
node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors, connectorPolicies, null);
+ taskDescriptors, connectorPolicies);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
index cf64554..42d0ecb 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -14,14 +14,9 @@
*/
package edu.uci.ics.hyracks.control.common;
-import java.rmi.RemoteException;
-import java.rmi.server.UnicastRemoteObject;
-
import edu.uci.ics.hyracks.control.common.service.IService;
-public abstract class AbstractRemoteService extends UnicastRemoteObject implements IService {
- private static final long serialVersionUID = 1L;
-
- public AbstractRemoteService() throws RemoteException {
+public abstract class AbstractRemoteService implements IService {
+ public AbstractRemoteService() {
}
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 53003d0..40e9347 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.common.base;
-import java.rmi.Remote;
import java.util.List;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
@@ -27,10 +26,10 @@
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
-public interface IClusterController extends Remote {
+public interface IClusterController {
public NodeParameters registerNode(NodeRegistration reg) throws Exception;
- public void unregisterNode(INodeController nodeController) throws Exception;
+ public void unregisterNode(String nodeId) throws Exception;
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index cca9d81..47f4ceb 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.common.base;
-import java.rmi.Remote;
import java.util.List;
import java.util.Map;
@@ -25,16 +24,11 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-public interface INodeController extends Remote {
- public String getId() throws Exception;
-
- public NCConfig getConfiguration() throws Exception;
-
+public interface INodeController {
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, byte[] ctxVarBytes) throws Exception;
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index 5091d12..4536fc4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -19,8 +19,17 @@
import org.kohsuke.args4j.Option;
public class CCConfig {
- @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
- public int port = 1099;
+ @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients", required = true)
+ public String clientNetIpAddress;
+
+ @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
+ public int clientNetPort = 1098;
+
+ @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from ", required = true)
+ public String clusterNetIpAddress;
+
+ @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+ public int clusterNetPort = 1099;
@Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
public int httpPort = 19001;
@@ -44,8 +53,14 @@
public String ccRoot = "ClusterControllerService";
public void toCommandLine(List<String> cList) {
- cList.add("-port");
- cList.add(String.valueOf(port));
+ cList.add("-client-net-ip-address");
+ cList.add(clientNetIpAddress);
+ cList.add("-client-net-port");
+ cList.add(String.valueOf(clientNetPort));
+ cList.add("-cluster-net-ip-address");
+ cList.add(clusterNetIpAddress);
+ cList.add("-cluster-net-port");
+ cList.add(String.valueOf(clusterNetPort));
cList.add("-http-port");
cList.add(String.valueOf(httpPort));
cList.add("-heartbeat-period");
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 02ce0f8..c55f34a 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -22,16 +22,19 @@
public class NCConfig implements Serializable {
private static final long serialVersionUID = 1L;
- @Option(name = "-cc-host", usage = "Cluster Controller host name")
+ @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
public String ccHost;
@Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
public int ccPort = 1099;
- @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster")
+ @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+ public String clusterNetIPAddress;
+
+ @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
public String nodeId;
- @Option(name = "-data-ip-address", usage = "IP Address to bind data listener")
+ @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
@Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
@@ -54,6 +57,8 @@
cList.add(ccHost);
cList.add("-cc-port");
cList.add(String.valueOf(ccPort));
+ cList.add("-cluster-net-ip-address");
+ cList.add(clusterNetIPAddress);
cList.add("-node-id");
cList.add(nodeId);
cList.add("-data-ip-address");
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
index d67b777..0161f96 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -17,27 +17,16 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
public class NodeParameters implements Serializable {
private static final long serialVersionUID = 1L;
- private IClusterController cc;
-
private ClusterControllerInfo ccInfo;
private int heartbeatPeriod;
private int profileDumpPeriod;
- public IClusterController getClusterController() {
- return cc;
- }
-
- public void setClusterController(IClusterController cc) {
- this.cc = cc;
- }
-
public ClusterControllerInfo getClusterControllerInfo() {
return ccInfo;
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index ff92c2d..f6dde10 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -15,15 +15,15 @@
package edu.uci.ics.hyracks.control.common.controllers;
import java.io.Serializable;
+import java.net.InetSocketAddress;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
public final class NodeRegistration implements Serializable {
private static final long serialVersionUID = 1L;
- private final INodeController nc;
+ private final InetSocketAddress ncAddress;
private final String nodeId;
@@ -41,9 +41,9 @@
private final HeartbeatSchema hbSchema;
- public NodeRegistration(INodeController nc, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
+ public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
String osName, String arch, String osVersion, int nProcessors, HeartbeatSchema hbSchema) {
- this.nc = nc;
+ this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
@@ -54,8 +54,8 @@
this.hbSchema = hbSchema;
}
- public INodeController getNodeController() {
- return nc;
+ public InetSocketAddress getNodeControllerAddress() {
+ return ncAddress;
}
public String getNodeId() {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
new file mode 100644
index 0000000..e35f962
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class ClusterControllerDelegateIPCI implements IIPCI {
+ private final IClusterController cc;
+
+ public ClusterControllerDelegateIPCI(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ ClusterControllerFunctions.Function fn = (Function) req;
+ switch (fn.getFunctionId()) {
+ case REGISTER_NODE: {
+ ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+ return cc.registerNode(rnf.getNodeRegistration());
+ }
+
+ case UNREGISTER_NODE: {
+ ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+ cc.unregisterNode(unf.getNodeId());
+ return null;
+ }
+
+ case NODE_HEARTBEAT: {
+ ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+ cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
+ return null;
+ }
+
+ case NOTIFY_JOBLET_CLEANUP: {
+ ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+ cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
+ return null;
+ }
+
+ case REPORT_PROFILE: {
+ ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+ cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
+ return null;
+ }
+
+ case NOTIFY_TASK_COMPLETE: {
+ ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+ cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
+ return null;
+ }
+ case NOTIFY_TASK_FAILURE: {
+ ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+ cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
+ return null;
+ }
+
+ case REGISTER_PARTITION_PROVIDER: {
+ ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+ cc.registerPartitionProvider(rppf.getPartitionDescriptor());
+ return null;
+ }
+
+ case REGISTER_PARTITION_REQUEST: {
+ ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+ cc.registerPartitionRequest(rprf.getPartitionRequest());
+ return null;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
new file mode 100644
index 0000000..4c76357
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public class ClusterControllerFunctions {
+ public enum FunctionId {
+ REGISTER_NODE,
+ UNREGISTER_NODE,
+ NOTIFY_TASK_COMPLETE,
+ NOTIFY_TASK_FAILURE,
+ NOTIFY_JOBLET_CLEANUP,
+ NODE_HEARTBEAT,
+ REPORT_PROFILE,
+ REGISTER_PARTITION_PROVIDER,
+ REGISTER_PARTITION_REQUEST,
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class RegisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeRegistration reg;
+
+ public RegisterNodeFunction(NodeRegistration reg) {
+ this.reg = reg;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_NODE;
+ }
+
+ public NodeRegistration getNodeRegistration() {
+ return reg;
+ }
+ }
+
+ public static class UnregisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public UnregisterNodeFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNREGISTER_NODE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NotifyTaskCompleteFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final TaskProfile statistics;
+
+ public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_COMPLETE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskProfile getStatistics() {
+ return statistics;
+ }
+ }
+
+ public static class NotifyTaskFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final String details;
+
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.details = details;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+ }
+
+ public static class NotifyJobletCleanupFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_JOBLET_CLEANUP;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NodeHeartbeatFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_HEARTBEAT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HeartbeatData getHeartbeatData() {
+ return hbData;
+ }
+ }
+
+ public static class ReportProfileFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final List<JobProfile> profiles;
+
+ public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+ this.nodeId = nodeId;
+ this.profiles = profiles;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PROFILE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public List<JobProfile> getProfiles() {
+ return profiles;
+ }
+ }
+
+ public static class RegisterPartitionProviderFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionDescriptor partitionDescriptor;
+
+ public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_PROVIDER;
+ }
+
+ public PartitionDescriptor getPartitionDescriptor() {
+ return partitionDescriptor;
+ }
+ }
+
+ public static class RegisterPartitionRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionRequest partitionRequest;
+
+ public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+ this.partitionRequest = partitionRequest;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_REQUEST;
+ }
+
+ public PartitionRequest getPartitionRequest() {
+ return partitionRequest;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..0aeab72
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+ private final IIPCHandle ipcHandle;
+
+ public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public NodeParameters registerNode(NodeRegistration reg) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+ NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
+ return result;
+ }
+
+ @Override
+ public void unregisterNode(String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+ nodeId);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+ jobId, taskId, nodeId, statistics);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+ jobId, taskId, nodeId, details);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+ jobId, nodeId);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+ hbData);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+ profiles);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+ partitionDescriptor);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+ partitionRequest);
+ sync.call(ipcHandle, fn);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
new file mode 100644
index 0000000..f3e51ec
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class NodeControllerDelegateIPCI implements IIPCI {
+ private final INodeController nc;
+
+ public NodeControllerDelegateIPCI(INodeController nc) {
+ this.nc = nc;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
+ switch (fn.getFunctionId()) {
+ case START_TASKS: {
+ NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+ nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
+ stf.getConnectorPolicies());
+ return null;
+ }
+
+ case ABORT_TASKS: {
+ NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+ nc.abortTasks(atf.getJobId(), atf.getTasks());
+ return null;
+ }
+
+ case CLEANUP_JOBLET: {
+ NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+ nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
+ return null;
+ }
+
+ case CREATE_APPLICATION: {
+ NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+ nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
+ return null;
+ }
+
+ case DESTROY_APPLICATION: {
+ NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+ nc.destroyApplication(daf.getAppName());
+ return null;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+ nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
+ return null;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
new file mode 100644
index 0000000..0d39c1b
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public class NodeControllerFunctions {
+ public enum FunctionId {
+ START_TASKS,
+ ABORT_TASKS,
+ CLEANUP_JOBLET,
+ CREATE_APPLICATION,
+ DESTROY_APPLICATION,
+ REPORT_PARTITION_AVAILABILITY
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class StartTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final JobId jobId;
+ private final byte[] planBytes;
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+ public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.appName = appName;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_TASKS;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public List<TaskAttemptDescriptor> getTaskDescriptors() {
+ return taskDescriptors;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+ return connectorPolicies;
+ }
+ }
+
+ public static class AbortTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_TASKS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<TaskAttemptId> getTasks() {
+ return tasks;
+ }
+ }
+
+ public static class CleanupJobletFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final JobStatus status;
+
+ public CleanupJobletFunction(JobId jobId, JobStatus status) {
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLEANUP_JOBLET;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final boolean deployHar;
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public boolean isDeployHar() {
+ return deployHar;
+ }
+
+ public byte[] getSerializedDistributedState() {
+ return serializedDistributedState;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class ReportPartitionAvailabilityFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PARTITION_AVAILABILITY;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..eaa2f27
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class NodeControllerRemoteProxy implements INodeController {
+ private final IIPCHandle ipcHandle;
+
+ public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+ planBytes, taskDescriptors, connectorPolicies);
+ sync.call(ipcHandle, stf);
+ }
+
+ @Override
+ public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+ sync.call(ipcHandle, atf);
+ }
+
+ @Override
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+ status);
+ sync.call(ipcHandle, cjf);
+ }
+
+ @Override
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+ appName, deployHar, serializedDistributedState);
+ sync.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+ appName);
+ sync.call(ipcHandle, daf);
+ }
+
+ @Override
+ public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+ pid, networkAddress);
+ sync.call(ipcHandle, rpaf);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 8027af5..d869515 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -23,8 +23,7 @@
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
+import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Hashtable;
@@ -58,6 +57,8 @@
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerDelegateIPCI;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
@@ -74,18 +75,20 @@
import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
- private static final long serialVersionUID = 1L;
-
private NCConfig ncConfig;
private final String id;
private final IHyracksRootContext ctx;
+ private final IPCSystem ipc;
+
private final PartitionManager partitionManager;
private final ConnectionManager connectionManager;
@@ -122,6 +125,8 @@
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
executor = Executors.newCachedThreadPool();
+ NodeControllerDelegateIPCI ipci = new NodeControllerDelegateIPCI(this);
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci, executor);
this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
@@ -160,28 +165,28 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ ipc.start();
connectionManager.start();
- Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
- IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+ this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
- .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
- .getAvailableProcessors(), hbSchema));
- this.ccs = nodeParameters.getClusterController();
+ this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig,
+ connectionManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
+ osMXBean.getAvailableProcessors(), hbSchema));
queue.start();
- heartbeatTask = new HeartbeatTask(cc);
+ heartbeatTask = new HeartbeatTask(ccs);
// Schedule heartbeat generator.
timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
if (nodeParameters.getProfileDumpPeriod() > 0) {
// Schedule profile dump generator.
- timer.schedule(new ProfileDumpTask(cc), 0, nodeParameters.getProfileDumpPeriod());
+ timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
}
LOGGER.log(Level.INFO, "Started NodeControllerService");
@@ -197,7 +202,6 @@
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
- @Override
public String getId() {
return id;
}
@@ -237,7 +241,7 @@
@Override
public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
queue.schedule(stw);
}
@@ -248,7 +252,6 @@
queue.schedule(cjw);
}
- @Override
public NCConfig getConfiguration() throws Exception {
return ncConfig;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index b9eaf1f..6e38ef7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -63,7 +63,8 @@
this.ctx = ctx;
serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
- serverSocket.bind(new InetSocketAddress(inetAddress, 0));
+ serverSocket.bind(new InetSocketAddress(inetAddress, 0), 0);
+ serverSocket.setReuseAddress(true);
stopped = false;
connectionListener = new ConnectionListenerThread();
dataListener = new DataListenerThread();
@@ -97,6 +98,7 @@
public ConnectionListenerThread() {
super("Hyracks NC Connection Listener");
setDaemon(true);
+ setPriority(MAX_PRIORITY);
}
@Override
@@ -159,6 +161,7 @@
if (!pendingIncomingConnections.isEmpty()) {
for (SocketChannel sc : pendingIncomingConnections) {
sc.configureBlocking(false);
+ sc.socket().setReuseAddress(true);
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
scKey.attach(buffer);
@@ -170,6 +173,7 @@
for (INetworkChannel nc : pendingOutgoingConnections) {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
+ sc.socket().setReuseAddress(true);
SelectionKey scKey = sc.register(selector, 0);
scKey.attach(nc);
nc.setSelectionKey(scKey);
@@ -202,6 +206,7 @@
} catch (HyracksException e) {
key.cancel();
sc.close();
+ channel.abort();
}
} else {
buffer.compact();
@@ -210,15 +215,19 @@
} else {
INetworkChannel channel = (INetworkChannel) key.attachment();
boolean close = false;
+ boolean error = false;
try {
close = channel.dispatchNetworkEvent();
} catch (IOException e) {
e.printStackTrace();
- close = true;
+ error = true;
}
- if (close) {
+ if (close || error) {
key.cancel();
sc.close();
+ if (error) {
+ channel.abort();
+ }
}
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 58f7088..23cf514 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -130,7 +130,7 @@
public synchronized boolean dispatchNetworkEvent() throws IOException {
if (aborted) {
eos = true;
- monitor.notifyEndOfStream(this);
+ monitor.notifyFailure(this);
return true;
}
if (key.isConnectable()) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index 33819a8..2b37c7b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -172,6 +172,7 @@
PartitionId pid = (PartitionId) channel.getAttachment();
int senderIndex = pid.getSenderIndex();
failSenders.set(senderIndex);
+ eosSenders.set(senderIndex);
NonDeterministicPartitionCollector.this.notifyAll();
}
}
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 8255c58..626bb98 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -18,7 +18,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -86,7 +86,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(options);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index ae988cd..5a0c90d 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -17,7 +17,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -84,7 +84,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(options);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index dbc0bca..ec62b1f 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -19,7 +19,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -77,7 +77,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(options);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index bf2c2e1..8075f89 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -17,7 +17,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -79,7 +79,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(options);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 29f8257..573c078 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -19,7 +19,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -80,7 +80,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(options);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index abedf4a..fac141c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -28,7 +28,7 @@
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -61,7 +61,10 @@
@BeforeClass
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.port = 39001;
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
ccConfig.profileDumpPeriod = 10000;
File outDir = new File("target/ClusterController");
outDir.mkdirs();
@@ -75,6 +78,7 @@
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
@@ -83,12 +87,13 @@
NCConfig ncConfig2 = new NCConfig();
ncConfig2.ccHost = "localhost";
ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
- hcc = new HyracksLocalConnection(cc);
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
hcc.createApplication("test", null);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index a1a5179..f594d7e 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -19,7 +19,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -112,7 +112,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job;
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 17acc10..4f78928 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -20,7 +20,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -95,7 +95,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
options.algo, options.htSize, options.sbSize, options.format);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index cffa0f4..8e63e0a 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -20,7 +20,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -118,7 +118,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index 9e76a3e..81e9f84 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -5,7 +5,7 @@
import java.util.Properties;
import java.util.Set;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -15,7 +15,7 @@
public class HyracksClient {
- private static HyracksRMIConnection connection;
+ private static HyracksConnection connection;
private static final String jobProfilingKey = "jobProfilingKey";
Set<String> systemLibs;
@@ -25,7 +25,7 @@
private void initialize(Properties properties) throws Exception {
String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
- connection = new HyracksRMIConnection(clusterController, 1099);
+ connection = new HyracksConnection(clusterController, 1099);
systemLibs = new HashSet<String>();
for (String systemLib : ConfigurationConstants.systemLibs) {
String systemLibPath = properties.getProperty(systemLib);
diff --git a/hyracks-ipc/pom.xml b/hyracks-ipc/pom.xml
new file mode 100644
index 0000000..49c4323
--- /dev/null
+++ b/hyracks-ipc/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-ipc</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
new file mode 100644
index 0000000..a4adf23
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCHandle {
+ public InetSocketAddress getRemoteAddress();
+
+ public void send(Object request, IResponseCallback callback) throws IPCException;
+
+ public void setAttachment(Object attachment);
+
+ public Object getAttachment();
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
new file mode 100644
index 0000000..ba9f343
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IIPCI {
+ public Object call(IIPCHandle caller, Object req) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
new file mode 100644
index 0000000..7d25f88
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IResponseCallback.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public interface IResponseCallback {
+ public void callback(IIPCHandle handle, Object response, Exception exception);
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
new file mode 100644
index 0000000..180b1dd
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+public final class SyncRMI implements IResponseCallback {
+ private boolean pending;
+
+ private Object response;
+
+ private Exception exception;
+
+ public SyncRMI() {
+ }
+
+ @Override
+ public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
+ pending = false;
+ this.response = response;
+ this.exception = exception;
+ notifyAll();
+ }
+
+ public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
+ pending = true;
+ response = null;
+ exception = null;
+ handle.send(request, this);
+ while (pending) {
+ wait();
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
new file mode 100644
index 0000000..9ecf015
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/exceptions/IPCException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.exceptions;
+
+public class IPCException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public IPCException() {
+ super();
+ }
+
+ public IPCException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IPCException(String message) {
+ super(message);
+ }
+
+ public IPCException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
new file mode 100644
index 0000000..47c3d4a
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+enum HandleState {
+ INITIAL,
+ CONNECT_SENT,
+ CONNECT_RECEIVED,
+ CONNECTED,
+ CLOSED,
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
new file mode 100644
index 0000000..cbbe718
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class IPCConnectionManager {
+ private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+
+ private final IPCSystem system;
+
+ private final NetworkThread networkThread;
+
+ private final ServerSocketChannel serverSocketChannel;
+
+ private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
+
+ private final List<IPCHandle>[] pendingConnections;
+
+ private final List<Message>[] sendList;
+
+ private int writerIndex;
+
+ private int readerIndex;
+
+ private final InetSocketAddress address;
+
+ private volatile boolean stopped;
+
+ IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+ this.system = system;
+ this.networkThread = new NetworkThread();
+ this.serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ ServerSocket socket = serverSocketChannel.socket();
+ socket.bind(socketAddress);
+ address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+ ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
+ pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
+ sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
+ writerIndex = 0;
+ readerIndex = 1;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ void start() {
+ stopped = false;
+ networkThread.start();
+ }
+
+ void stop() throws IOException {
+ stopped = true;
+ serverSocketChannel.close();
+ }
+
+ IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+ IPCHandle handle;
+ synchronized (this) {
+ handle = ipcHandleMap.get(remoteAddress);
+ if (handle == null) {
+ handle = new IPCHandle(system, remoteAddress);
+ pendingConnections[writerIndex].add(handle);
+ networkThread.selector.wakeup();
+ }
+ }
+ handle.waitTillConnected();
+ return handle;
+ }
+
+ synchronized void registerHandle(IPCHandle handle) {
+ ipcHandleMap.put(handle.getRemoteAddress(), handle);
+ }
+
+ synchronized void write(Message msg) {
+ sendList[writerIndex].add(msg);
+ networkThread.selector.wakeup();
+ }
+
+ private synchronized void swapReadersAndWriters() {
+ int temp = readerIndex;
+ readerIndex = writerIndex;
+ writerIndex = temp;
+ }
+
+ private Message createInitialReqMessage(IPCHandle handle) {
+ Message msg = new Message(handle);
+ msg.setMessageId(system.createMessageId());
+ msg.setRequestMessageId(-1);
+ msg.setFlag(Message.INITIAL_REQ);
+ msg.setPayload(address);
+ return msg;
+ }
+
+ private Message createInitialAckMessage(IPCHandle handle, Message req) {
+ Message msg = new Message(handle);
+ msg.setMessageId(system.createMessageId());
+ msg.setRequestMessageId(req.getMessageId());
+ msg.setFlag(Message.INITIAL_ACK);
+ msg.setPayload(null);
+ return msg;
+ }
+
+ void ack(IPCHandle handle, Message req) {
+ write(createInitialAckMessage(handle, req));
+ }
+
+ private class NetworkThread extends Thread {
+ private final Selector selector;
+
+ public NetworkThread() {
+ super("IPC Network Listener Thread");
+ setDaemon(true);
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ throw new RuntimeException(e);
+ }
+ while (!stopped) {
+ try {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting Select");
+ }
+ int n = selector.select();
+ swapReadersAndWriters();
+ if (!pendingConnections[readerIndex].isEmpty()) {
+ for (IPCHandle handle : pendingConnections[readerIndex]) {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ SelectionKey cKey = null;
+ if (channel.connect(handle.getRemoteAddress())) {
+ cKey = channel.register(selector, SelectionKey.OP_READ);
+ handle.setState(HandleState.CONNECT_SENT);
+ write(createInitialReqMessage(handle));
+ } else {
+ cKey = channel.register(selector, SelectionKey.OP_CONNECT);
+ }
+ handle.setKey(cKey);
+ cKey.attach(handle);
+ }
+ pendingConnections[readerIndex].clear();
+ }
+ if (!sendList[readerIndex].isEmpty()) {
+ for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+ Message msg = i.next();
+ IPCHandle handle = msg.getIPCHandle();
+ if (handle.getState() == HandleState.CLOSED) {
+ i.remove();
+ } else if (!handle.full()) {
+ while (true) {
+ ByteBuffer buffer = handle.getOutBuffer();
+ buffer.compact();
+ boolean success = msg.write(buffer);
+ buffer.flip();
+ if (success) {
+ i.remove();
+ SelectionKey key = handle.getKey();
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else {
+ if (buffer.position() == 0) {
+ handle.resizeOutBuffer();
+ continue;
+ }
+ handle.markFull();
+ }
+ break;
+ }
+ }
+ }
+ }
+ if (n > 0) {
+ for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+ SelectionKey key = i.next();
+ i.remove();
+ SelectableChannel sc = key.channel();
+ if (key.isReadable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ IPCHandle handle = (IPCHandle) key.attachment();
+ ByteBuffer readBuffer = handle.getInBuffer();
+ int len = channel.read(readBuffer);
+ if (len < 0) {
+ key.cancel();
+ channel.close();
+ handle.close();
+ } else {
+ handle.processIncomingMessages();
+ if (!readBuffer.hasRemaining()) {
+ handle.resizeInBuffer();
+ }
+ }
+ } else if (key.isWritable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ IPCHandle handle = (IPCHandle) key.attachment();
+ ByteBuffer writeBuffer = handle.getOutBuffer();
+ int len = channel.write(writeBuffer);
+ if (len < 0) {
+ key.cancel();
+ channel.close();
+ handle.close();
+ } else if (!writeBuffer.hasRemaining()) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ }
+ handle.clearFull();
+ } else if (key.isAcceptable()) {
+ assert sc == serverSocketChannel;
+ SocketChannel channel = serverSocketChannel.accept();
+ channel.configureBlocking(false);
+ IPCHandle handle = new IPCHandle(system, null);
+ SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
+ handle.setKey(cKey);
+ cKey.attach(handle);
+ handle.setState(HandleState.CONNECT_RECEIVED);
+ } else if (key.isConnectable()) {
+ SocketChannel channel = (SocketChannel) sc;
+ if (channel.finishConnect()) {
+ IPCHandle handle = (IPCHandle) key.attachment();
+ handle.setState(HandleState.CONNECT_SENT);
+ registerHandle(handle);
+ key.interestOps(SelectionKey.OP_READ);
+ write(createInitialReqMessage(handle));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
new file mode 100644
index 0000000..481a0b0
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+final class IPCHandle implements IIPCHandle {
+ private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+
+ private final IPCSystem system;
+
+ private InetSocketAddress remoteAddress;
+
+ private final Map<Long, IResponseCallback> pendingRequestMap;
+
+ private HandleState state;
+
+ private SelectionKey key;
+
+ private Object attachment;
+
+ private ByteBuffer inBuffer;
+
+ private ByteBuffer outBuffer;
+
+ private boolean full;
+
+ IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
+ this.system = system;
+ this.remoteAddress = remoteAddress;
+ pendingRequestMap = new HashMap<Long, IResponseCallback>();
+ inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ outBuffer.flip();
+ state = HandleState.INITIAL;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ void setRemoteAddress(InetSocketAddress remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+ if (state != HandleState.CONNECTED) {
+ throw new IPCException("Handle is not in Connected state");
+ }
+ Message msg = new Message(this);
+ long mid = system.createMessageId();
+ msg.setMessageId(mid);
+ msg.setRequestMessageId(-1);
+ msg.setPayload(req);
+ if (callback != null) {
+ pendingRequestMap.put(mid, callback);
+ }
+ system.getConnectionManager().write(msg);
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ SelectionKey getKey() {
+ return key;
+ }
+
+ void setKey(SelectionKey key) {
+ this.key = key;
+ }
+
+ public synchronized boolean isConnected() {
+ return state == HandleState.CONNECTED;
+ }
+
+ synchronized HandleState getState() {
+ return state;
+ }
+
+ synchronized void setState(HandleState state) {
+ this.state = state;
+ notifyAll();
+ }
+
+ synchronized void waitTillConnected() throws InterruptedException {
+ while (!isConnected()) {
+ wait();
+ }
+ }
+
+ ByteBuffer getInBuffer() {
+ return inBuffer;
+ }
+
+ ByteBuffer getOutBuffer() {
+ return outBuffer;
+ }
+
+ synchronized void close() {
+ setState(HandleState.CLOSED);
+ for (IResponseCallback cb : pendingRequestMap.values()) {
+ cb.callback(this, null, new IPCException("IPC Handle Closed"));
+ }
+ }
+
+ synchronized void processIncomingMessages() {
+ inBuffer.flip();
+ while (Message.hasMessage(inBuffer)) {
+ Message message = new Message(this);
+ try {
+ message.read(inBuffer);
+ } catch (Exception e) {
+ message.setFlag(Message.ERROR);
+ message.setPayload(e);
+ }
+
+ if (state == HandleState.CONNECT_RECEIVED) {
+ remoteAddress = (InetSocketAddress) message.getPayload();
+ system.getConnectionManager().registerHandle(this);
+ setState(HandleState.CONNECTED);
+ system.getConnectionManager().ack(this, message);
+ continue;
+ } else if (state == HandleState.CONNECT_SENT) {
+ if (message.getFlag() == Message.INITIAL_ACK) {
+ setState(HandleState.CONNECTED);
+ } else {
+ throw new IllegalStateException();
+ }
+ continue;
+ }
+ long requestMessageId = message.getRequestMessageId();
+ if (requestMessageId < 0) {
+ system.deliverIncomingMessage(message);
+ } else {
+ Long rid = Long.valueOf(requestMessageId);
+ IResponseCallback cb = pendingRequestMap.remove(rid);
+ if (cb != null) {
+ byte flag = message.getFlag();
+ Object payload = flag == Message.ERROR ? null : message.getPayload();
+ Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
+ cb.callback(this, payload, exception);
+ }
+ }
+ }
+ inBuffer.compact();
+ }
+
+ void resizeInBuffer() {
+ inBuffer.flip();
+ ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
+ readBuffer.put(inBuffer);
+ readBuffer.compact();
+ inBuffer = readBuffer;
+ }
+
+ void resizeOutBuffer() {
+ ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
+ writeBuffer.put(outBuffer);
+ writeBuffer.compact();
+ writeBuffer.flip();
+ outBuffer = writeBuffer;
+ }
+
+ void markFull() {
+ full = true;
+ }
+
+ void clearFull() {
+ full = false;
+ }
+
+ boolean full() {
+ return full;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
new file mode 100644
index 0000000..9eef8ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCSystem {
+ private final IPCConnectionManager cMgr;
+
+ private final IIPCI ipci;
+
+ private final Executor executor;
+
+ private final AtomicLong midFactory;
+
+ public IPCSystem(InetSocketAddress socketAddress) throws IOException {
+ this(socketAddress, null, null);
+ }
+
+ public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+ cMgr = new IPCConnectionManager(this, socketAddress);
+ this.ipci = ipci;
+ this.executor = executor;
+ midFactory = new AtomicLong();
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return cMgr.getAddress();
+ }
+
+ public void start() {
+ cMgr.start();
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+ try {
+ return cMgr.getIPCHandle(remoteAddress);
+ } catch (IOException e) {
+ throw new IPCException(e);
+ } catch (InterruptedException e) {
+ throw new IPCException(e);
+ }
+ }
+
+ long createMessageId() {
+ return midFactory.incrementAndGet();
+ }
+
+ void deliverIncomingMessage(final Message message) {
+ assert message.getFlag() == Message.NORMAL;
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ IPCHandle handle = message.getIPCHandle();
+ Message response = new Message(handle);
+ response.setMessageId(createMessageId());
+ response.setRequestMessageId(message.getMessageId());
+ response.setFlag(Message.NORMAL);
+ try {
+ Object result = ipci.call(handle, message.getPayload());
+ response.setPayload(result);
+ } catch (Exception e) {
+ response.setFlag(Message.ERROR);
+ response.setPayload(e);
+ }
+ cMgr.write(response);
+ }
+ });
+ }
+
+ IPCConnectionManager getConnectionManager() {
+ return cMgr;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
new file mode 100644
index 0000000..ab3428e
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+
+class Message {
+ private static final int MSG_SIZE_SIZE = 4;
+
+ private static final int HEADER_SIZE = 17;
+
+ static final byte INITIAL_REQ = 1;
+
+ static final byte INITIAL_ACK = 2;
+
+ static final byte ERROR = 3;
+
+ static final byte NORMAL = 0;
+
+ private IPCHandle ipcHandle;
+
+ private long messageId;
+
+ private long requestMessageId;
+
+ private byte flag;
+
+ private Object payload;
+
+ Message(IPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ IPCHandle getIPCHandle() {
+ return ipcHandle;
+ }
+
+ void setMessageId(long messageId) {
+ this.messageId = messageId;
+ }
+
+ long getMessageId() {
+ return messageId;
+ }
+
+ void setRequestMessageId(long requestMessageId) {
+ this.requestMessageId = requestMessageId;
+ }
+
+ long getRequestMessageId() {
+ return requestMessageId;
+ }
+
+ void setFlag(byte flag) {
+ this.flag = flag;
+ }
+
+ byte getFlag() {
+ return flag;
+ }
+
+ void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
+ Object getPayload() {
+ return payload;
+ }
+
+ static boolean hasMessage(ByteBuffer buffer) {
+ if (buffer.remaining() < MSG_SIZE_SIZE) {
+ return false;
+ }
+ int msgSize = buffer.getInt(buffer.position());
+ return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+ }
+
+ void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+ assert hasMessage(buffer);
+ int msgSize = buffer.getInt();
+ messageId = buffer.getLong();
+ requestMessageId = buffer.getLong();
+ flag = buffer.get();
+ int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+ msgSize - HEADER_SIZE));
+ payload = ois.readObject();
+ ois.close();
+ } finally {
+ buffer.position(finalPosition);
+ }
+ }
+
+ boolean write(ByteBuffer buffer) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(payload);
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+ if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
+ buffer.putInt(HEADER_SIZE + bytes.length);
+ buffer.putLong(messageId);
+ buffer.putLong(requestMessageId);
+ buffer.put(flag);
+ buffer.put(bytes);
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
new file mode 100644
index 0000000..1fc1f6f
--- /dev/null
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/ipc/tests/IPCTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.ipc.tests;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+public class IPCTest {
+ @Test
+ public void test() throws Exception {
+ IPCSystem server = createServerIPCSystem();
+ server.start();
+ InetSocketAddress serverAddr = server.getSocketAddress();
+
+ IPCSystem client = createClientIPCSystem();
+ client.start();
+
+ IIPCHandle handle = client.getHandle(serverAddr);
+
+ SyncRMI rmi = new SyncRMI();
+ for (int i = 0; i < 100; ++i) {
+ Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+ }
+
+ IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
+
+ try {
+ rmi.call(rHandle, "Foo");
+ Assert.assertTrue(false);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
+
+ private IPCSystem createServerIPCSystem() throws IOException {
+ Executor executor = Executors.newCachedThreadPool();
+ IIPCI ipci = new IIPCI() {
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ Integer i = (Integer) req;
+ return i.intValue() * 2;
+ }
+ };
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ }
+
+ private IPCSystem createClientIPCSystem() throws IOException {
+ Executor executor = Executors.newCachedThreadPool();
+ IIPCI ipci = new IIPCI() {
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ throw new IllegalStateException();
+ }
+ };
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java b/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
index 8ed7bb7..4faa619 100644
--- a/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
+++ b/hyracks-server/src/main/java/edu/uci/ics/hyracks/server/drivers/VirtualClusterDriver.java
@@ -27,8 +27,11 @@
@Option(name = "-n", required = false, usage = "Number of node controllers (default: 2)")
public int n = 2;
- @Option(name = "-cc-port", required = false, usage = "CC Port (default: 1099)")
- public int ccPort = 1099;
+ @Option(name = "-cc-client-net-port", required = false, usage = "CC Port (default: 1098)")
+ public int ccClientNetPort = 1098;
+
+ @Option(name = "-cc-cluster-net-port", required = false, usage = "CC Port (default: 1099)")
+ public int ccClusterNetPort = 1099;
@Option(name = "-cc-http-port", required = false, usage = "CC Port (default: 19001)")
public int ccHttpPort = 19001;
@@ -46,7 +49,10 @@
}
CCConfig ccConfig = new CCConfig();
- ccConfig.port = options.ccPort;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = options.ccClusterNetPort;
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = options.ccClientNetPort;
ccConfig.httpPort = options.ccHttpPort;
HyracksCCProcess ccp = new HyracksCCProcess(ccConfig);
ccp.start();
@@ -56,7 +62,9 @@
HyracksNCProcess ncps[] = new HyracksNCProcess[options.n];
for (int i = 0; i < options.n; ++i) {
NCConfig ncConfig = new NCConfig();
- ncConfig.ccHost = "localhost";
+ ncConfig.ccHost = "127.0.0.1";
+ ncConfig.ccPort = options.ccClusterNetPort;
+ ncConfig.clusterNetIPAddress = "127.0.0.1";
ncConfig.nodeId = "nc" + i;
ncConfig.dataIPAddress = "127.0.0.1";
ncps[i] = new HyracksNCProcess(ncConfig);
diff --git a/pom.xml b/pom.xml
index 18c5cb0..890f42d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
</pluginRepositories>
<modules>
+ <module>hyracks-ipc</module>
<module>hyracks-api</module>
<module>hyracks-dataflow-common</module>
<module>hyracks-dataflow-std</module>