Added ability to turn off connector policy aware task cluster construction

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1061 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 929462b..e6e6c2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,6 +49,8 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
 
 public class ActivityClusterPlanner {
+    private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
+
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
 
     private final JobScheduler scheduler;
@@ -64,13 +66,28 @@
         JobRun jobRun = scheduler.getJobRun();
         Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
+        Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
+
+        assignConnectorPolicy(ac, activityPlanMap);
+
+        TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Plan for " + ac);
+            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
+            for (TaskCluster tc : taskClusters) {
+                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+            }
+        }
+
+        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+    }
+
+    private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPartitionDetails> pcMap) {
         Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
-        Set<ActivityId> activities = ac.getActivities();
-
-        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
         Set<ActivityId> depAnIds = new HashSet<ActivityId>();
-        for (ActivityId anId : activities) {
+        for (ActivityId anId : ac.getActivities()) {
             depAnIds.clear();
             getDependencyActivityIds(depAnIds, anId);
             ActivityPartitionDetails apd = pcMap.get(anId);
@@ -94,53 +111,21 @@
                     tasks[i].getDependencies().add(dTaskId);
                     dTask.getDependents().add(tid);
                 }
-                Set<TaskId> cluster = new HashSet<TaskId>();
-                cluster.add(tid);
-                taskClusterMap.put(tid, cluster);
             }
             activityPlan.setTasks(tasks);
             activityPlanMap.put(anId, activityPlan);
         }
+        return activityPlanMap;
+    }
 
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
-        scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
+    private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        Set<ActivityId> activities = ac.getActivities();
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
+                activityPlanMap, activities);
 
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
-                    int nConsumers = ac2TaskStates.length;
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                .getTaskId());
-                        if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                        Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
-                            TaskId targetTID = ac2TaskStates[j].getTaskId();
-                            cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
-                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
-                            if (cPolicy.requiresProducerConsumerCoscheduling()) {
-                                cluster.add(targetTID);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        TaskCluster[] taskClusters = buildTaskClusters(ac, activityPlanMap, taskClusterMap);
+        TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+                ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
 
         for (TaskCluster tc : taskClusters) {
             Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
@@ -170,20 +155,88 @@
                 }
             }
         }
+        return taskClusters;
+    }
 
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Plan for " + ac);
-            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
-            for (TaskCluster tc : taskClusters) {
-                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+    private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        List<Task> taskStates = new ArrayList<Task>();
+        for (ActivityId anId : ac.getActivities()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            for (Task t : tasks) {
+                taskStates.add(t);
+            }
+        }
+        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
+                taskStates.toArray(new Task[taskStates.size()]));
+        for (Task t : tc.getTasks()) {
+            t.setTaskCluster(tc);
+        }
+        return new TaskCluster[] { tc };
+    }
+
+    private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId ac1 : activities) {
+            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            if (outputConns != null) {
+                for (IConnectorDescriptor c : outputConns) {
+                    ConnectorDescriptorId cdId = c.getConnectorId();
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
+                    int nConsumers = ac2TaskStates.length;
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+                                .getTaskId());
+                        if (cInfoList == null) {
+                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                        }
+                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                            TaskId targetTID = ac2TaskStates[j].getTaskId();
+                            cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+                        }
+                    }
+                }
+            }
+        }
+        return taskConnectivity;
+    }
+
+    private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap,
+            Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
+        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+        for (ActivityId anId : ac.getActivities()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            for (Task t : tasks) {
+                Set<TaskId> cluster = new HashSet<TaskId>();
+                TaskId tid = t.getTaskId();
+                cluster.add(tid);
+                taskClusterMap.put(tid, cluster);
             }
         }
 
-        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
-    }
+        JobRun jobRun = ac.getJobRun();
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+        for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
+            Set<TaskId> cluster = taskClusterMap.get(e.getKey());
+            for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
+                IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
+                if (cPolicy.requiresProducerConsumerCoscheduling()) {
+                    cluster.add(p.getLeft());
+                }
+            }
+        }
 
-    private TaskCluster[] buildTaskClusters(ActivityCluster ac, Map<ActivityId, ActivityPlan> activityPlanMap,
-            Map<TaskId, Set<TaskId>> taskClusterMap) {
         /*
          * taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
          * We compute the transitive closure of this relation to find the largest set of
@@ -273,8 +326,7 @@
         }
     }
 
-    private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityId, ActivityPlan> taskMap) {
+    private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
         JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
         Set<ActivityId> activities = ac.getActivities();
@@ -300,7 +352,7 @@
                 }
             }
         }
-        return cPolicyMap;
+        ac.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
     }
 
     private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {