changes for statistics component
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_statistics@1372 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f3d00ac..a5c28ed 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -61,6 +61,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
@@ -365,6 +366,12 @@
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+ .getAppName(), amf.getNodeId()));
+ return;
+ }
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
@@ -414,4 +421,8 @@
}
}
+
+ public void sendMessage(byte[] data, String appName, String nodeId) throws Exception {
+ ccs.sendMessage(data, appName, nodeId);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 1bb0b2f..aead881 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -88,7 +88,9 @@
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+ private NodeControllerService ncs;
+
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
@@ -102,6 +104,7 @@
failed = false;
errorBaos = new ByteArrayOutputStream();
errorWriter = new PrintWriter(errorBaos, true);
+ this.ncs = ncs;
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -341,4 +344,9 @@
public ITaskState getTaskState(TaskId taskId) {
return opEnv.getTaskState(taskId);
}
+
+ @Override
+ public void sendMessage(byte[] message, String nodeId) throws Exception {
+ this.ncs.sendMessage(message, this.getJobletContext().getApplicationContext().getApplicationName(), nodeId);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 1121c6c..cb0f149 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -6,6 +6,7 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCBootstrap;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..1588d8c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ *
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+ private byte[] message;
+ private String nodeId;
+ private NodeControllerService ncs;
+ private String appName;
+
+ public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+ this.ncs = ncs;
+ this.nodeId = nodeId;
+ this.message = message;
+ this.appName = appName;
+ }
+
+ @Override
+ public void run() {
+
+ NCApplicationContext ctx = ncs.getApplications().get(appName);
+ try {
+ IMessage data = (IMessage) ctx.deserialize(message);
+ ctx.getMessageBroker().receivedMessageFromNC(data, nodeId);
+ } catch (IOException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+ } catch (ClassNotFoundException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "nodeID: " + nodeId;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index eb982df..159ecb0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,6 +18,7 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
@@ -86,6 +87,7 @@
.notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
} catch (Exception e) {
LOGGER.warning("Error creating application: " + e.getMessage());
+ LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
}
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index a101612..e085c89 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -110,7 +110,7 @@
LOGGER.info("Initializing " + taId + " -> " + han);
}
final int partition = tid.getPartition();
- Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();