Connect the whole register result generating partition path from the NodeController through the ClusterController by implementing the necessary RPC interfaces.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2476 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5a33891..3fe2f6a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -48,15 +49,18 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
+import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
@@ -416,6 +420,13 @@
return;
}
+ case REGISTER_RESULT_PARTITION_LOCATION: {
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
+ .getJobId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+ return;
+ }
+
case APPLICATION_STATE_CHANGE_RESPONSE: {
CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 0c5bb2f..b0b81f4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,6 +16,7 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
@@ -46,6 +47,9 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions, NetworkAddress addr)
+ throws Exception;
+
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 557a8cb..56e9830 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -68,6 +68,7 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
+ REGISTER_RESULT_PARTITION_LOCATION,
APPLICATION_STATE_CHANGE_RESPONSE,
NODE_REGISTRATION_RESULT,
@@ -438,6 +439,46 @@
}
}
+ public static class RegisterResultPartitionLocationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationFunction(JobId jobId, int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.jobId = jobId;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public int getNPartitions() {
+ return nPartitions;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+
public static class ApplicationStateChangeResponseFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index bbaab4e..76b7043 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,6 +16,7 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
@@ -95,6 +96,14 @@
}
@Override
+ public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+ NetworkAddress networkAddress) throws Exception {
+ CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
+ jobId, partition, nPartitions, networkAddress);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
nodeId, appName, status);