Lazy planning scheduler implemented
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@513 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index f7f3cb1..91626f7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -22,7 +21,6 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class ActivityCluster {
private final JobRun jobRun;
@@ -33,11 +31,7 @@
private final Set<ActivityCluster> dependents;
- private final Map<ActivityId, Task[]> taskStateMap;
-
- private TaskCluster[] taskClusters;
-
- private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+ private ActivityClusterPlan acp;
private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
@@ -48,8 +42,6 @@
this.activities = activities;
dependencies = new HashSet<ActivityCluster>();
dependents = new HashSet<ActivityCluster>();
- taskStateMap = new HashMap<ActivityId, Task[]>();
- partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
inProgressTaskClusters = new HashSet<TaskCluster>();
}
@@ -73,22 +65,6 @@
return dependents;
}
- public Map<ActivityId, Task[]> getTaskMap() {
- return taskStateMap;
- }
-
- public TaskCluster[] getTaskClusters() {
- return taskClusters;
- }
-
- public void setTaskClusters(TaskCluster[] taskClusters) {
- this.taskClusters = taskClusters;
- }
-
- public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
- return partitionProducingTaskClusterMap;
- }
-
public JobRun getJobRun() {
return jobRun;
}
@@ -97,6 +73,14 @@
return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
}
+ public ActivityClusterPlan getPlan() {
+ return acp;
+ }
+
+ public void setPlan(ActivityClusterPlan acp) {
+ this.acp = acp;
+ }
+
public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
this.connectorPolicies = connectorPolicies;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
new file mode 100644
index 0000000..3f97651
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ActivityClusterPlan {
+ private final Map<ActivityId, Task[]> taskStateMap;
+
+ private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+ private final TaskCluster[] taskClusters;
+
+ public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap,
+ Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap) {
+ this.taskStateMap = taskStateMap;
+ this.partitionProducingTaskClusterMap = partitionProducingTaskClusterMap;
+ this.taskClusters = taskClusters;
+ }
+
+ public Map<ActivityId, Task[]> getTaskMap() {
+ return taskStateMap;
+ }
+
+ public TaskCluster[] getTaskClusters() {
+ return taskClusters;
+ }
+
+ public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
+ return partitionProducingTaskClusterMap;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index d4a9283..7b02baf 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -21,6 +21,8 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -43,6 +45,8 @@
private final Map<ActivityId, ActivityCluster> activityClusterMap;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
+
private JobScheduler js;
private JobStatus status;
@@ -56,6 +60,7 @@
participatingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
+ connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
}
public UUID getJobId() {
@@ -121,4 +126,8 @@
public void setActivityClusters(Set<ActivityCluster> activityClusters) {
this.activityClusters = activityClusters;
}
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
+ return connectorPolicyMap;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 3ca75b8..28deb44 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
+import java.util.HashSet;
+import java.util.Set;
+
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
@@ -22,11 +25,17 @@
private final ActivityPartitionDetails apd;
+ private final Set<TaskId> dependencies;
+
+ private final Set<TaskId> dependents;
+
private TaskCluster taskCluster;
public Task(TaskId taskId, ActivityPartitionDetails apd) {
this.taskId = taskId;
this.apd = apd;
+ this.dependencies = new HashSet<TaskId>();
+ this.dependents = new HashSet<TaskId>();
}
public TaskId getTaskId() {
@@ -37,6 +46,14 @@
return apd;
}
+ public Set<TaskId> getDependencies() {
+ return dependencies;
+ }
+
+ public Set<TaskId> getDependents() {
+ return dependents;
+ }
+
public TaskCluster getTaskCluster() {
return taskCluster;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index ae2533e..d3de5a1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -31,13 +31,19 @@
private final Set<PartitionId> requiredPartitions;
+ private final Set<TaskCluster> dependencyTaskClusters;
+
+ private final Set<TaskCluster> dependentTaskClusters;
+
private final List<TaskClusterAttempt> taskClusterAttempts;
public TaskCluster(ActivityCluster ac, Task[] tasks) {
this.ac = ac;
this.tasks = tasks;
- this.producedPartitions = new HashSet<PartitionId>();
- this.requiredPartitions = new HashSet<PartitionId>();
+ producedPartitions = new HashSet<PartitionId>();
+ requiredPartitions = new HashSet<PartitionId>();
+ dependencyTaskClusters = new HashSet<TaskCluster>();
+ dependentTaskClusters = new HashSet<TaskCluster>();
taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
}
@@ -57,6 +63,14 @@
return requiredPartitions;
}
+ public Set<TaskCluster> getDependencyTaskClusters() {
+ return dependencyTaskClusters;
+ }
+
+ public Set<TaskCluster> getDependentTaskClusters() {
+ return dependentTaskClusters;
+ }
+
public List<TaskClusterAttempt> getAttempts() {
return taskClusterAttempts;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 643cc18..c90b8ff 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -51,7 +51,7 @@
Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
if (ac != null) {
- Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+ Map<ActivityId, Task[]> taskStateMap = ac.getPlan().getTaskMap();
Task[] taskStates = taskStateMap.get(tid.getActivityId());
if (taskStates != null && taskStates.length > tid.getPartition()) {
Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index 0d33c21..895135f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -31,7 +31,7 @@
protected void performEvent(TaskAttempt ta) {
try {
ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
- ac.getJobRun().getScheduler().notifyTaskComplete(ta, ac);
+ ac.getJobRun().getScheduler().notifyTaskComplete(ta);
} catch (HyracksException e) {
e.printStackTrace();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
similarity index 72%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index bbcb83d..7231842 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -40,57 +40,75 @@
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-public class TaskClusterBuilder {
- private static final Logger LOGGER = Logger.getLogger(TaskClusterBuilder.class.getName());
+public class ActivityClusterPlanner {
+ private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
- private final JobRun jobRun;
- private final PartitionConstraintSolver solver;
+ private final JobScheduler scheduler;
- public TaskClusterBuilder(JobRun jobRun, PartitionConstraintSolver solver) {
- this.jobRun = jobRun;
- this.solver = solver;
+ public ActivityClusterPlanner(JobScheduler newJobScheduler) {
+ this.scheduler = newJobScheduler;
}
- public void buildTaskClusters(ActivityCluster ac) throws HyracksException {
+ public void planActivityCluster(ActivityCluster ac) throws HyracksException {
+ JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
- Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+ Map<ActivityId, Task[]> taskMap = new HashMap<ActivityId, Task[]>();
Set<ActivityId> activities = ac.getActivities();
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+ Set<ActivityId> depAnIds = new HashSet<ActivityId>();
for (ActivityId anId : activities) {
+ depAnIds.clear();
+ getDependencyActivityIds(depAnIds, anId);
ActivityPartitionDetails apd = pcMap.get(anId);
- Task[] taskStates = new Task[apd.getPartitionCount()];
- for (int i = 0; i < taskStates.length; ++i) {
+ Task[] tasks = new Task[apd.getPartitionCount()];
+ for (int i = 0; i < tasks.length; ++i) {
TaskId tid = new TaskId(anId, i);
- taskStates[i] = new Task(tid, apd);
+ tasks[i] = new Task(tid, apd);
+ for (ActivityId danId : depAnIds) {
+ ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
+ ActivityClusterPlan dACP = dAC.getPlan();
+ assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ + danId;
+ Task[] dATasks = dACP.getTaskMap().get(danId);
+ assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ + danId;
+ assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
+ + dATasks.length + " != " + tasks.length;
+ Task dTask = dATasks[i];
+ TaskId dTaskId = dTask.getTaskId();
+ tasks[i].getDependencies().add(dTaskId);
+ dTask.getDependents().add(tid);
+ }
Set<TaskId> cluster = new HashSet<TaskId>();
cluster.add(tid);
taskClusterMap.put(tid, cluster);
}
- taskStateMap.put(anId, taskStates);
+ taskMap.put(anId, tasks);
}
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
- ac.setConnectorPolicyMap(connectorPolicies);
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, taskMap, pcMap);
+ scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
JobActivityGraph jag = jobRun.getJobActivityGraph();
BitSet targetBitmap = new BitSet();
for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = taskStateMap.get(ac1);
+ Task[] ac1TaskStates = taskMap.get(ac1);
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskStateMap.get(ac2);
+ Task[] ac2TaskStates = taskMap.get(ac2);
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -146,24 +164,25 @@
for (Set<TaskId> cluster : clusters) {
Set<Task> taskStates = new HashSet<Task>();
for (TaskId tid : cluster) {
- taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
+ taskStates.add(taskMap.get(tid.getActivityId())[tid.getPartition()]);
}
TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
tcSet.add(tc);
for (TaskId tid : cluster) {
- taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+ taskMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
}
}
- ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
+ TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
- Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
- for (TaskCluster tc : ac.getTaskClusters()) {
+ Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+ for (TaskCluster tc : taskClusters) {
+ Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
for (Task ts : tc.getTasks()) {
TaskId tid = ts.getTaskId();
List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
if (cInfoList != null) {
for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
- Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+ Task targetTS = taskMap.get(p.first.getActivityId())[p.first.getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
ConnectorDescriptorId cdId = p.second;
@@ -175,6 +194,12 @@
}
}
}
+
+ for (TaskId dTid : ts.getDependencies()) {
+ TaskCluster dTC = getTaskCluster(dTid);
+ dTC.getDependentTaskClusters().add(tc);
+ tcDependencyTaskClusters.add(dTC);
+ }
}
}
@@ -185,24 +210,42 @@
LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
}
}
+
+ ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap, partitionProducingTaskClusterMap));
+ }
+
+ private TaskCluster getTaskCluster(TaskId tid) {
+ ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
+ ActivityClusterPlan acp = ac.getPlan();
+ Task[] tasks = acp.getTaskMap().get(tid.getActivityId());
+ Task task = tasks[tid.getPartition()];
+ assert task.getTaskId().equals(tid);
+ return task.getTaskCluster();
+ }
+
+ private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId) {
+ JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
+ Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(anId);
+ if (blockers != null) {
+ depAnIds.addAll(blockers);
+ }
}
private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityId, ActivityPartitionDetails> pcMap) {
- JobActivityGraph jag = jobRun.getJobActivityGraph();
+ Map<ActivityId, Task[]> taskMap, Map<ActivityId, ActivityPartitionDetails> pcMap) {
+ JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
Set<ActivityId> activities = ac.getActivities();
- Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = taskStateMap.get(ac1);
+ for (ActivityId a1 : activities) {
+ Task[] ac1TaskStates = taskMap.get(a1);
int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskStateMap.get(ac2);
+ ActivityId a2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = taskMap.get(a2);
int nConsumers = ac2TaskStates.length;
int[] fanouts = new int[nProducers];
@@ -219,7 +262,7 @@
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
- IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+ IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph().getJobSpecification()
.getConnectorPolicyAssignmentPolicy();
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
@@ -229,6 +272,8 @@
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
throws HyracksException {
+ PartitionConstraintSolver solver = scheduler.getSolver();
+ JobRun jobRun = scheduler.getJobRun();
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
for (ActivityId anId : ac.getActivities()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 3c19397..28a28ad 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -69,20 +69,28 @@
private final JobRun jobRun;
- private final Set<ActivityCluster> completedClusters;
-
- private final Set<ActivityCluster> inProgressClusters;
-
private final PartitionConstraintSolver solver;
+ private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+ private final Set<TaskCluster> inProgressTaskClusters;
+
private Set<ActivityCluster> rootActivityClusters;
public JobScheduler(ClusterControllerService ccs, JobRun jobRun) {
this.ccs = ccs;
this.jobRun = jobRun;
- completedClusters = new HashSet<ActivityCluster>();
- inProgressClusters = new HashSet<ActivityCluster>();
solver = new PartitionConstraintSolver();
+ partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+ inProgressTaskClusters = new HashSet<TaskCluster>();
+ }
+
+ public JobRun getJobRun() {
+ return jobRun;
+ }
+
+ public PartitionConstraintSolver getSolver() {
+ return solver;
}
public void startJob() throws HyracksException {
@@ -120,95 +128,165 @@
rootActivityClusters = acgb.inferActivityClusters(jag);
}
- private void findPrerequisiteActivities(Set<ActivityId> prereqs, ActivityCluster ac) {
- TaskCluster[] taskClusters = ac.getTaskClusters();
- if (taskClusters == null) {
- JobActivityGraph jag = jobRun.getJobActivityGraph();
- for (ActivityId aid : ac.getActivities()) {
- Set<ActivityId> deps = jag.getBlocked2BlockerMap().get(aid);
- if (deps != null) {
- prereqs.addAll(deps);
- }
- }
- } else {
-
- }
- }
-
- private void findRunnableActivityClusters(Set<ActivityCluster> frontier, Set<ActivityCluster> roots) {
+ private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
+ throws HyracksException {
for (ActivityCluster root : roots) {
- findRunnableActivityClusters(frontier, root);
+ findRunnableTaskClusterRoots(frontier, root);
}
}
- private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
- if (frontier.contains(candidate) || inProgressClusters.contains(candidate)
- || completedClusters.contains(candidate)) {
- return;
- }
+ private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, ActivityCluster candidate)
+ throws HyracksException {
boolean depsComplete = true;
- Set<ActivityId> prereqs = new HashSet<ActivityId>();
- findPrerequisiteActivities(prereqs, candidate);
- Map<ActivityCluster, Set<ActivityId>> prereqACMap = new HashMap<ActivityCluster, Set<ActivityId>>();
- for (ActivityId aid : prereqs) {
- ActivityCluster ac = jobRun.getActivityClusterMap().get(aid);
- Set<ActivityId> pSet = prereqACMap.get(ac);
- if (pSet == null) {
- pSet = new HashSet<ActivityId>();
- prereqACMap.put(ac, pSet);
- }
- pSet.add(aid);
- }
for (ActivityCluster depAC : candidate.getDependencies()) {
- if (!completedClusters.contains(depAC)) {
+ if (!isPlanned(depAC)) {
depsComplete = false;
- findRunnableActivityClusters(frontier, depAC);
+ findRunnableTaskClusterRoots(frontier, depAC);
} else {
- Set<ActivityId> pSet = prereqACMap.get(depAC);
- if (pSet != null) {
-
+ boolean tcRootsComplete = true;
+ Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
+ for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
+ if (tc.getProducedPartitions().isEmpty()) {
+ if (findLastTaskClusterAttempt(tc).getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+ tcRootsComplete = false;
+ }
+ depACTCRoots.add(tc);
+ }
}
- // TODO
+ if (!tcRootsComplete) {
+ depsComplete = false;
+ findRunnableTaskClusterRoots(frontier, depAC);
+ }
}
}
if (depsComplete) {
- frontier.add(candidate);
- }
- }
-
- private void startRunnableActivityClusters() throws HyracksException {
- Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
- findRunnableActivityClusters(runnableClusters, rootActivityClusters);
- if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
- ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
- return;
- }
- for (ActivityCluster ac : runnableClusters) {
- inProgressClusters.add(ac);
- TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
- tcb.buildTaskClusters(ac);
- startRunnableTaskClusters(ac);
- }
- }
-
- private void abortActivityCluster(ActivityCluster ac) {
- TaskCluster[] taskClusters = ac.getTaskClusters();
- for (TaskCluster tc : taskClusters) {
- List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
- if (!tcAttempts.isEmpty()) {
- TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
- if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
- abortTaskCluster(tcAttempt, ac);
- tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ if (!isPlanned(candidate)) {
+ ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
+ acp.planActivityCluster(candidate);
+ }
+ for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
+ if (tc.getProducedPartitions().isEmpty()) {
+ TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+ if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+ frontier.add(tc);
+ }
}
}
}
- inProgressClusters.remove(ac);
}
- private void assignTaskLocations(ActivityCluster ac, TaskCluster tc,
- Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
- JobRun jobRun = ac.getJobRun();
+ private boolean isPlanned(ActivityCluster ac) {
+ return ac.getPlan() != null;
+ }
+
+ private void startRunnableActivityClusters() throws HyracksException {
+ Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
+ findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+ + inProgressTaskClusters);
+ }
+ if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
+ ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+ return;
+ }
+ startRunnableTaskClusters(taskClusterRoots);
+ }
+
+ private void startRunnableTaskClusters(Set<TaskCluster> tcRoots) throws HyracksException {
+ Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
+ for (TaskCluster tc : tcRoots) {
+ assignRunnabilityRank(tc, runnabilityMap);
+ }
+
+ PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
+ for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
+ TaskCluster tc = e.getKey();
+ Runnability runnability = e.getValue();
+ assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
+ if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
+ continue;
+ }
+ int priority = runnability.getPriority();
+ if (priority >= 0 && priority < Integer.MAX_VALUE) {
+ queue.add(new RankedRunnableTaskCluster(priority, tc));
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ranked TCs: " + queue);
+ }
+
+ Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
+ for (RankedRunnableTaskCluster rrtc : queue) {
+ TaskCluster tc = rrtc.getTaskCluster();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Found runnable TC: " + tc);
+ List<TaskClusterAttempt> attempts = tc.getAttempts();
+ LOGGER.info("Attempts so far:" + attempts.size());
+ for (TaskClusterAttempt tcAttempt : attempts) {
+ LOGGER.info("Status: " + tcAttempt.getStatus());
+ }
+ }
+ assignTaskLocations(tc, taskAttemptMap);
+ }
+
+ if (taskAttemptMap.isEmpty()) {
+ return;
+ }
+
+ startTasks(taskAttemptMap);
+ }
+
+ /*
+ * Runnability rank has the following semantics
+ * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
+ * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+ * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
+ */
+ private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
+ if (runnabilityMap.containsKey(goal)) {
+ return runnabilityMap.get(goal);
+ }
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+ if (lastAttempt != null) {
+ if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+ Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
+ runnabilityMap.put(goal, runnability);
+ return runnability;
+ }
+ if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+ Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
+ runnabilityMap.put(goal, runnability);
+ return runnability;
+ }
+ }
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
+ PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+ Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+ for (PartitionId pid : goal.getRequiredPartitions()) {
+ Runnability runnability;
+ ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+ IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+ PartitionState maxState = pmm.getMaximumAvailableState(pid);
+ if (PartitionState.COMMITTED.equals(maxState)) {
+ runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+ } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
+ runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+ } else {
+ runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
+ if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
+ && cPolicy.consumerWaitsForProducerToFinish()) {
+ runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+ }
+ }
+ aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+ }
+ runnabilityMap.put(goal, aggregateRunnability);
+ return aggregateRunnability;
+ }
+
+ private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
+ throws HyracksException {
JobActivityGraph jag = jobRun.getJobActivityGraph();
Task[] tasks = tc.getTasks();
List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
@@ -289,13 +367,13 @@
tcAttempt.initializePendingTaskCounter();
tcAttempts.add(tcAttempt);
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
- ac.getInProgressTaskClusters().add(tc);
+ tc.getActivityCluster().getInProgressTaskClusters().add(tc);
}
private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
ActivityId blockerAID = tid.getActivityId();
ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
- Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
+ Task[] blockerTasks = blockerAC.getPlan().getTaskMap().get(blockerAID);
List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
if (tcAttempts == null || tcAttempts.isEmpty()) {
return null;
@@ -318,152 +396,12 @@
return null;
}
- public void notifyTaskComplete(TaskAttempt ta, ActivityCluster ac) throws HyracksException {
- TaskAttemptId taId = ta.getTaskAttemptId();
- TaskCluster tc = ta.getTaskState().getTaskCluster();
- TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
- if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
- TaskAttempt.TaskStatus taStatus = ta.getStatus();
- if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
- ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
- if (lastAttempt.decrementPendingTasksCounter() == 0) {
- lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
- ac.getInProgressTaskClusters().remove(tc);
- startRunnableTaskClusters(ac);
- }
- } else {
- LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
- }
- } else {
- LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
- }
- }
-
- private void startRunnableTaskClusters(ActivityCluster ac) throws HyracksException {
- Map<TaskCluster, Runnability> runnabilityMap = computeRunnabilityRanks(ac);
-
- PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
- for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
- TaskCluster tc = e.getKey();
- Runnability runnability = e.getValue();
- assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
- if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
- continue;
- }
- int priority = runnability.getPriority();
- if (priority >= 0 && priority < Integer.MAX_VALUE) {
- queue.add(new RankedRunnableTaskCluster(priority, tc));
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ranked TCs: " + queue);
- }
-
- Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
- for (RankedRunnableTaskCluster rrtc : queue) {
- TaskCluster tc = rrtc.getTaskCluster();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Found runnable TC: " + tc);
- List<TaskClusterAttempt> attempts = tc.getAttempts();
- LOGGER.info("Attempts so far:" + attempts.size());
- for (TaskClusterAttempt tcAttempt : attempts) {
- LOGGER.info("Status: " + tcAttempt.getStatus());
- }
- }
- assignTaskLocations(ac, tc, taskAttemptMap);
- }
-
- if (taskAttemptMap.isEmpty()) {
- if (ac.getInProgressTaskClusters().isEmpty()) {
- completedClusters.add(ac);
- inProgressClusters.remove(ac);
- startRunnableActivityClusters();
- }
- return;
- }
-
- startTasks(ac, taskAttemptMap);
- }
-
- private void abortJob(Exception exception) {
- for (ActivityCluster ac : inProgressClusters) {
- abortActivityCluster(ac);
- }
- jobRun.setStatus(JobStatus.FAILURE, exception);
- }
-
- private Map<TaskCluster, Runnability> computeRunnabilityRanks(ActivityCluster ac) {
- TaskCluster[] taskClusters = ac.getTaskClusters();
-
- Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
- for (TaskCluster tc : taskClusters) {
- // Start search at TCs that produce no outputs (sinks)
- if (tc.getProducedPartitions().isEmpty()) {
- assignRunnabilityRank(tc, runnabilityMap, ac);
- }
- }
- return runnabilityMap;
- }
-
- /*
- * Runnability rank has the following semantics
- * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
- * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
- * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
- */
- private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap,
- ActivityCluster ac) {
- if (runnabilityMap.containsKey(goal)) {
- return runnabilityMap.get(goal);
- }
- TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
- if (lastAttempt != null) {
- if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
- Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
- runnabilityMap.put(goal, runnability);
- return runnability;
- }
- if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
- Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
- runnabilityMap.put(goal, runnability);
- return runnability;
- }
- }
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
- JobRun jobRun = ac.getJobRun();
- PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
- Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
- for (PartitionId pid : goal.getRequiredPartitions()) {
- Runnability runnability;
- ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
- IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
- PartitionState maxState = pmm.getMaximumAvailableState(pid);
- if (PartitionState.COMMITTED.equals(maxState)) {
- runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
- } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
- runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
- } else {
- runnability = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityMap,
- ac);
- if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
- && cPolicy.consumerWaitsForProducerToFinish()) {
- runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
- }
- }
- aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
- }
- runnabilityMap.put(goal, aggregateRunnability);
- return aggregateRunnability;
- }
-
- private void startTasks(ActivityCluster ac, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
- throws HyracksException {
+ private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
Executor executor = ccs.getExecutor();
- JobRun jobRun = ac.getJobRun();
final UUID jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
- final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
String nodeId = e.getKey();
final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
@@ -488,7 +426,14 @@
}
}
- private void abortTaskCluster(TaskClusterAttempt tcAttempt, ActivityCluster ac) {
+ private void abortJob(Exception exception) {
+ for (TaskCluster tc : inProgressTaskClusters) {
+ abortTaskCluster(findLastTaskClusterAttempt(tc));
+ }
+ jobRun.setStatus(JobStatus.FAILURE, exception);
+ }
+
+ private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
@@ -504,7 +449,6 @@
abortTaskAttempts.add(taId);
}
}
- JobRun jobRun = ac.getJobRun();
final UUID jobId = jobRun.getJobId();
for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
@@ -523,32 +467,30 @@
});
}
}
- ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
+ inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
TaskCluster tc = tcAttempt.getTaskCluster();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
}
- private void abortDoomedTaskClusters(ActivityCluster ac) throws HyracksException {
- TaskCluster[] taskClusters = ac.getTaskClusters();
-
+ private void abortDoomedTaskClusters() throws HyracksException {
Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
- for (TaskCluster tc : taskClusters) {
+ for (TaskCluster tc : inProgressTaskClusters) {
// Start search at TCs that produce no outputs (sinks)
if (tc.getProducedPartitions().isEmpty()) {
- findDoomedTaskClusters(tc, ac, doomedTaskClusters);
+ findDoomedTaskClusters(tc, doomedTaskClusters);
}
}
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
- abortTaskCluster(tca, ac);
+ abortTaskCluster(tca);
tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
}
- private boolean findDoomedTaskClusters(TaskCluster tc, ActivityCluster ac, Set<TaskCluster> doomedTaskClusters) {
+ private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
if (doomedTaskClusters.contains(tc)) {
return true;
}
@@ -563,17 +505,21 @@
return false;
}
}
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
- JobRun jobRun = ac.getJobRun();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
boolean doomed = false;
+ for (TaskCluster depTC : tc.getDependencyTaskClusters()) {
+ if (findDoomedTaskClusters(depTC, doomedTaskClusters)) {
+ doomed = true;
+ }
+ }
for (PartitionId pid : tc.getRequiredPartitions()) {
ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
PartitionState maxState = pmm.getMaximumAvailableState(pid);
if (maxState == null
|| (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
- if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), ac, doomedTaskClusters)) {
+ if (findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
doomed = true;
}
}
@@ -584,6 +530,27 @@
return doomed;
}
+ public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
+ TaskAttemptId taId = ta.getTaskAttemptId();
+ TaskCluster tc = ta.getTaskState().getTaskCluster();
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
+ TaskAttempt.TaskStatus taStatus = ta.getStatus();
+ if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
+ ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+ if (lastAttempt.decrementPendingTasksCounter() == 0) {
+ lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+ inProgressTaskClusters.remove(tc);
+ startRunnableActivityClusters();
+ }
+ } else {
+ LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+ }
+ } else {
+ LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+ }
+ }
+
/**
* Indicates that a single task attempt has encountered a failure.
*
@@ -603,14 +570,14 @@
TaskAttempt.TaskStatus taStatus = ta.getStatus();
if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
- abortTaskCluster(lastAttempt, ac);
+ abortTaskCluster(lastAttempt);
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- abortDoomedTaskClusters(ac);
+ abortDoomedTaskClusters();
if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
- abortActivityCluster(ac);
+ abortJob(null);
return;
}
- startRunnableTaskClusters(ac);
+ startRunnableActivityClusters();
} else {
LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
}
@@ -633,7 +600,7 @@
try {
jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
for (ActivityCluster ac : jobRun.getActivityClusters()) {
- TaskCluster[] taskClusters = ac.getTaskClusters();
+ TaskCluster[] taskClusters = ac.getPlan().getTaskClusters();
if (taskClusters != null) {
for (TaskCluster tc : taskClusters) {
TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
@@ -646,18 +613,15 @@
if (deadNodes.contains(ta.getNodeId())) {
ta.setStatus(TaskAttempt.TaskStatus.FAILED,
new HyracksException("Node " + ta.getNodeId() + " failed"));
- TaskId tid = ta.getTaskAttemptId().getTaskId();
- ActivityId aid = tid.getActivityId();
-
abort = true;
}
}
if (abort) {
- abortTaskCluster(lastTaskClusterAttempt, ac);
+ abortTaskCluster(lastTaskClusterAttempt);
}
}
}
- abortDoomedTaskClusters(ac);
+ abortDoomedTaskClusters();
}
}
startRunnableActivityClusters();