Reduced the shipping around of Job Activity Graphs to at most once per NC per job.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1075 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index ba3b41c..eada1f5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -443,13 +443,13 @@
final NodeControllerState node = ccs.getNodeMap().get(nodeId);
if (node != null) {
node.getActiveJobIds().add(jobRun.getJobId());
- jobRun.getParticipatingNodeIds().add(nodeId);
+ boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
}
try {
- node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors, connectorPolicies);
+ byte[] jagBytes = changed ? JavaSerializationUtils.serialize(jag) : null;
+ node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2a844ad..2456c25 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc.work;
import java.util.List;
+import java.util.logging.Level;
import org.apache.commons.lang3.tuple.Pair;
@@ -56,6 +57,11 @@
}
@Override
+ public Level logLevel() {
+ return Level.FINEST;
+ }
+
+ @Override
public String toString() {
return "PartitionAvailable@" + partitionDescriptor;
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 961a9e3..8176cb7 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,8 +38,8 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
- CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
- planBytes, taskDescriptors, connectorPolicies);
+ CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
+ taskDescriptors, connectorPolicies);
ipcHandle.send(-1, stf, null);
}
@@ -51,23 +51,21 @@
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
- status);
+ CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
ipcHandle.send(-1, cjf, null);
}
@Override
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception {
- CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
- appName, deployHar, serializedDistributedState);
+ CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(appName, deployHar,
+ serializedDistributedState);
ipcHandle.send(-1, caf, null);
}
@Override
public void destroyApplication(String appName) throws Exception {
- CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
- appName);
+ CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(appName);
ipcHandle.send(-1, daf, null);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index cfa0aaa..2532e2a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -59,6 +60,8 @@
private final JobId jobId;
+ private final JobActivityGraph jag;
+
private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
@@ -79,10 +82,11 @@
private boolean cleanupPending;
- public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
+ public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag) {
this.nodeController = nodeController;
this.appCtx = appCtx;
this.jobId = jobId;
+ this.jag = jag;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
taskStateMap = new HashMap<TaskId, ITaskState>();
@@ -98,6 +102,10 @@
return jobId;
}
+ public JobActivityGraph getJobActivityGraph() {
+ return jag;
+ }
+
public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
if (!envMap.containsKey(opId)) {
envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 034be8b..a101612 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -86,7 +86,9 @@
try {
Map<String, NCApplicationContext> applications = ncs.getApplications();
NCApplicationContext appCtx = applications.get(appName);
- final JobActivityGraph jag = (JobActivityGraph) appCtx.deserialize(jagBytes);
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jagBytes == null ? null
+ : (JobActivityGraph) appCtx.deserialize(jagBytes));
+ final JobActivityGraph jag = joblet.getJobActivityGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
@@ -100,8 +102,6 @@
}
};
- final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jag);
-
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
@@ -164,7 +164,10 @@
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- ji = new Joblet(ncs, jobId, appCtx);
+ if (jag == null) {
+ throw new NullPointerException("JobActivityGraph was null");
+ }
+ ji = new Joblet(ncs, jobId, appCtx, jag);
IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
if (jelf != null) {
IJobletEventListener listener = jelf.createListener(ji);