changes for statistics component

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_statistics@1372 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f3d00ac..a5c28ed 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -61,6 +61,7 @@
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
 import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
 import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
@@ -365,6 +366,12 @@
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
+                case SEND_APPLICATION_MESSAGE: {
+                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+                            .getAppName(), amf.getNodeId()));
+                    return;
+                }
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
                     queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
@@ -414,4 +421,8 @@
 
         }
     }
+
+    public void sendMessage(byte[] data, String appName, String nodeId) throws Exception {
+        ccs.sendMessage(data, appName, nodeId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 1bb0b2f..aead881 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -88,7 +88,9 @@
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+    private NodeControllerService ncs;
+
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
@@ -102,6 +104,7 @@
         failed = false;
         errorBaos = new ByteArrayOutputStream();
         errorWriter = new PrintWriter(errorBaos, true);
+        this.ncs = ncs;
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -341,4 +344,9 @@
     public ITaskState getTaskState(TaskId taskId) {
         return opEnv.getTaskState(taskId);
     }
+
+    @Override
+    public void sendMessage(byte[] message, String nodeId) throws Exception {
+        this.ncs.sendMessage(message, this.getJobletContext().getApplicationContext().getApplicationName(), nodeId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 1121c6c..cb0f149 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -6,6 +6,7 @@
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.application.INCBootstrap;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..1588d8c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ * 
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+    private byte[] message;
+    private String nodeId;
+    private NodeControllerService ncs;
+    private String appName;
+
+    public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+        this.ncs = ncs;
+        this.nodeId = nodeId;
+        this.message = message;
+        this.appName = appName;
+    }
+
+    @Override
+    public void run() {
+
+        NCApplicationContext ctx = ncs.getApplications().get(appName);
+        try {
+            IMessage data = (IMessage) ctx.deserialize(message);
+            ctx.getMessageBroker().receivedMessageFromNC(data, nodeId);
+        } catch (IOException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+        } catch (ClassNotFoundException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "nodeID: " + nodeId;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index eb982df..159ecb0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,6 +18,7 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.io.IOUtils;
@@ -86,6 +87,7 @@
                     .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
         } catch (Exception e) {
             LOGGER.warning("Error creating application: " + e.getMessage());
+            LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index a101612..e085c89 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -110,7 +110,7 @@
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 final int partition = tid.getPartition();
-                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();