Reduced the shipping around of Job Activity Graphs to at most once per NC per job.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1075 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index ba3b41c..eada1f5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -443,13 +443,13 @@
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
                 node.getActiveJobIds().add(jobRun.getJobId());
-                jobRun.getParticipatingNodeIds().add(nodeId);
+                boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
                 if (LOGGER.isLoggable(Level.FINE)) {
                     LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
                 }
                 try {
-                    node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                            taskDescriptors, connectorPolicies);
+                    byte[] jagBytes = changed ? JavaSerializationUtils.serialize(jag) : null;
+                    node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2a844ad..2456c25 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import java.util.List;
+import java.util.logging.Level;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -56,6 +57,11 @@
     }
 
     @Override
+    public Level logLevel() {
+        return Level.FINEST;
+    }
+
+    @Override
     public String toString() {
         return "PartitionAvailable@" + partitionDescriptor;
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 961a9e3..8176cb7 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,8 +38,8 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
-        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
-                planBytes, taskDescriptors, connectorPolicies);
+        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
+                taskDescriptors, connectorPolicies);
         ipcHandle.send(-1, stf, null);
     }
 
@@ -51,23 +51,21 @@
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
-                status);
+        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
         ipcHandle.send(-1, cjf, null);
     }
 
     @Override
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception {
-        CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
-                appName, deployHar, serializedDistributedState);
+        CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(appName, deployHar,
+                serializedDistributedState);
         ipcHandle.send(-1, caf, null);
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
-                appName);
+        CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(appName);
         ipcHandle.send(-1, daf, null);
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index cfa0aaa..2532e2a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -59,6 +60,8 @@
 
     private final JobId jobId;
 
+    private final JobActivityGraph jag;
+
     private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
 
     private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
@@ -79,10 +82,11 @@
 
     private boolean cleanupPending;
 
-    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
+    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
         this.jobId = jobId;
+        this.jag = jag;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
         envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
         taskStateMap = new HashMap<TaskId, ITaskState>();
@@ -98,6 +102,10 @@
         return jobId;
     }
 
+    public JobActivityGraph getJobActivityGraph() {
+        return jag;
+    }
+
     public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
         if (!envMap.containsKey(opId)) {
             envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 034be8b..a101612 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -86,7 +86,9 @@
         try {
             Map<String, NCApplicationContext> applications = ncs.getApplications();
             NCApplicationContext appCtx = applications.get(appName);
-            final JobActivityGraph jag = (JobActivityGraph) appCtx.deserialize(jagBytes);
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jagBytes == null ? null
+                    : (JobActivityGraph) appCtx.deserialize(jagBytes));
+            final JobActivityGraph jag = joblet.getJobActivityGraph();
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
@@ -100,8 +102,6 @@
                 }
             };
 
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jag);
-
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
@@ -164,7 +164,10 @@
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            ji = new Joblet(ncs, jobId, appCtx);
+            if (jag == null) {
+                throw new NullPointerException("JobActivityGraph was null");
+            }
+            ji = new Joblet(ncs, jobId, appCtx, jag);
             IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
             if (jelf != null) {
                 IJobletEventListener listener = jelf.createListener(ji);