Added ability to turn off connector policy aware task cluster construction
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1061 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 929462b..e6e6c2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,6 +49,8 @@
import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
public class ActivityClusterPlanner {
+ private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
+
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
private final JobScheduler scheduler;
@@ -64,13 +66,28 @@
JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+ Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
+
+ assignConnectorPolicy(ac, activityPlanMap);
+
+ TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Plan for " + ac);
+ LOGGER.info("Built " + taskClusters.length + " Task Clusters");
+ for (TaskCluster tc : taskClusters) {
+ LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ }
+ }
+
+ ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+ }
+
+ private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
+ Map<ActivityId, ActivityPartitionDetails> pcMap) {
Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
- Set<ActivityId> activities = ac.getActivities();
-
- Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
Set<ActivityId> depAnIds = new HashSet<ActivityId>();
- for (ActivityId anId : activities) {
+ for (ActivityId anId : ac.getActivities()) {
depAnIds.clear();
getDependencyActivityIds(depAnIds, anId);
ActivityPartitionDetails apd = pcMap.get(anId);
@@ -94,53 +111,21 @@
tasks[i].getDependencies().add(dTaskId);
dTask.getDependents().add(tid);
}
- Set<TaskId> cluster = new HashSet<TaskId>();
- cluster.add(tid);
- taskClusterMap.put(tid, cluster);
}
activityPlan.setTasks(tasks);
activityPlanMap.put(anId, activityPlan);
}
+ return activityPlanMap;
+ }
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
- scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
+ private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
+ Map<ActivityId, ActivityPlan> activityPlanMap) {
+ Set<ActivityId> activities = ac.getActivities();
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
+ activityPlanMap, activities);
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
- JobActivityGraph jag = jobRun.getJobActivityGraph();
- BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
- int nConsumers = ac2TaskStates.length;
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
- TaskId targetTID = ac2TaskStates[j].getTaskId();
- cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
- IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
- if (cPolicy.requiresProducerConsumerCoscheduling()) {
- cluster.add(targetTID);
- }
- }
- }
- }
- }
- }
-
- TaskCluster[] taskClusters = buildTaskClusters(ac, activityPlanMap, taskClusterMap);
+ TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+ ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
for (TaskCluster tc : taskClusters) {
Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
@@ -170,20 +155,88 @@
}
}
}
+ return taskClusters;
+ }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Plan for " + ac);
- LOGGER.info("Built " + taskClusters.length + " Task Clusters");
- for (TaskCluster tc : taskClusters) {
- LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
+ Map<ActivityId, ActivityPlan> activityPlanMap) {
+ List<Task> taskStates = new ArrayList<Task>();
+ for (ActivityId anId : ac.getActivities()) {
+ ActivityPlan ap = activityPlanMap.get(anId);
+ Task[] tasks = ap.getTasks();
+ for (Task t : tasks) {
+ taskStates.add(t);
+ }
+ }
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
+ taskStates.toArray(new Task[taskStates.size()]));
+ for (Task t : tc.getTasks()) {
+ t.setTaskCluster(tc);
+ }
+ return new TaskCluster[] { tc };
+ }
+
+ private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
+ Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
+ BitSet targetBitmap = new BitSet();
+ for (ActivityId ac1 : activities) {
+ Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
+ int nProducers = ac1TaskStates.length;
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ if (outputConns != null) {
+ for (IConnectorDescriptor c : outputConns) {
+ ConnectorDescriptorId cdId = c.getConnectorId();
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
+ int nConsumers = ac2TaskStates.length;
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+ .getTaskId());
+ if (cInfoList == null) {
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ TaskId targetTID = ac2TaskStates[j].getTaskId();
+ cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+ }
+ }
+ }
+ }
+ }
+ return taskConnectivity;
+ }
+
+ private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
+ Map<ActivityId, ActivityPlan> activityPlanMap,
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
+ Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+ for (ActivityId anId : ac.getActivities()) {
+ ActivityPlan ap = activityPlanMap.get(anId);
+ Task[] tasks = ap.getTasks();
+ for (Task t : tasks) {
+ Set<TaskId> cluster = new HashSet<TaskId>();
+ TaskId tid = t.getTaskId();
+ cluster.add(tid);
+ taskClusterMap.put(tid, cluster);
}
}
- ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
- }
+ JobRun jobRun = ac.getJobRun();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+ for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
+ Set<TaskId> cluster = taskClusterMap.get(e.getKey());
+ for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
+ IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
+ if (cPolicy.requiresProducerConsumerCoscheduling()) {
+ cluster.add(p.getLeft());
+ }
+ }
+ }
- private TaskCluster[] buildTaskClusters(ActivityCluster ac, Map<ActivityId, ActivityPlan> activityPlanMap,
- Map<TaskId, Set<TaskId>> taskClusterMap) {
/*
* taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
* We compute the transitive closure of this relation to find the largest set of
@@ -273,8 +326,7 @@
}
}
- private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityId, ActivityPlan> taskMap) {
+ private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
Set<ActivityId> activities = ac.getActivities();
@@ -300,7 +352,7 @@
}
}
}
- return cPolicyMap;
+ ac.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {