changes for statistics component
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_statistics@1373 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 9f29fbd..aee63a4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -54,6 +55,7 @@
protected Properties deploymentDescriptor;
protected IBootstrap bootstrap;
protected Serializable distributedState;
+ protected IMessageBroker statsConnector;
public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
this.serverCtx = serverCtx;
@@ -205,4 +207,14 @@
public ApplicationStatus getStatus() {
return status;
}
+
+ @Override
+ public void setMessageBroker(IMessageBroker staticticsConnector) {
+ this.statsConnector = staticticsConnector;
+ }
+
+ @Override
+ public IMessageBroker getMessageBroker() {
+ return this.statsConnector;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a03f5c9..fa01595 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,4 +48,6 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+
+ public void sendMessage(byte[] data, String appName, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9ea4636..53cd4ec 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -70,10 +70,47 @@
CREATE_APPLICATION,
DESTROY_APPLICATION,
REPORT_PARTITION_AVAILABILITY,
+ SEND_APPLICATION_MESSAGE,
OTHER
}
+ public static class SendApplicationMessageFunction extends Function {
+ private static final long serialVersionUID = 1L;
+ private byte[] serializedMessage;
+ private String nodeId;
+ private String appName;
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public byte[] getMessage() {
+ return serializedMessage;
+ }
+
+ public SendApplicationMessageFunction(byte[] data, String appName, String nodeId) {
+ super();
+ this.serializedMessage = data;
+ this.nodeId = nodeId;
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SEND_APPLICATION_MESSAGE;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ }
+
public static abstract class Function implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index a0dabdd..d02948c 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -43,44 +43,40 @@
@Override
public void unregisterNode(String nodeId) throws Exception {
- CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
- nodeId);
+ CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
- jobId, taskId, nodeId, statistics);
+ CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+ nodeId, statistics);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
- jobId, taskId, nodeId, details);
+ CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+ details);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
- jobId, nodeId);
+ CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
- hbData);
+ CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
- profiles);
+ CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
ipcHandle.send(-1, fn, null);
}
@@ -104,4 +100,10 @@
nodeId, appName, status);
ipcHandle.send(-1, fn, null);
}
+
+ @Override
+ public void sendMessage(byte[] data, String appName, String nodeId) throws Exception {
+ CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
}
\ No newline at end of file