changes for statistics component

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_statistics@1373 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 9f29fbd..aee63a4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -38,6 +38,7 @@
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
@@ -54,6 +55,7 @@
     protected Properties deploymentDescriptor;
     protected IBootstrap bootstrap;
     protected Serializable distributedState;
+    protected IMessageBroker statsConnector;
 
     public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
         this.serverCtx = serverCtx;
@@ -205,4 +207,14 @@
     public ApplicationStatus getStatus() {
         return status;
     }
+
+    @Override
+    public void setMessageBroker(IMessageBroker staticticsConnector) {
+        this.statsConnector = staticticsConnector;
+    }
+
+    @Override
+    public IMessageBroker getMessageBroker() {
+        return this.statsConnector;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a03f5c9..fa01595 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,4 +48,6 @@
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+
+    public void sendMessage(byte[] data, String appName, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9ea4636..53cd4ec 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -70,10 +70,47 @@
         CREATE_APPLICATION,
         DESTROY_APPLICATION,
         REPORT_PARTITION_AVAILABILITY,
+        SEND_APPLICATION_MESSAGE,
 
         OTHER
     }
 
+    public static class SendApplicationMessageFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private byte[] serializedMessage;
+        private String nodeId;
+        private String appName;
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public void setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public byte[] getMessage() {
+            return serializedMessage;
+        }
+
+        public SendApplicationMessageFunction(byte[] data, String appName, String nodeId) {
+            super();
+            this.serializedMessage = data;
+            this.nodeId = nodeId;
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SEND_APPLICATION_MESSAGE;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+    }
+
     public static abstract class Function implements Serializable {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index a0dabdd..d02948c 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -43,44 +43,40 @@
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
-                nodeId);
+        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
-                jobId, taskId, nodeId, statistics);
+        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+                nodeId, statistics);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
-                jobId, taskId, nodeId, details);
+        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+                details);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
-                jobId, nodeId);
+        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
-                hbData);
+        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
-                profiles);
+        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
         ipcHandle.send(-1, fn, null);
     }
 
@@ -104,4 +100,10 @@
                 nodeId, appName, status);
         ipcHandle.send(-1, fn, null);
     }
+
+    @Override
+    public void sendMessage(byte[] data, String appName, String nodeId) throws Exception {
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
 }
\ No newline at end of file