Lazy planning scheduler implemented

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@513 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index f7f3cb1..91626f7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -22,7 +21,6 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class ActivityCluster {
     private final JobRun jobRun;
@@ -33,11 +31,7 @@
 
     private final Set<ActivityCluster> dependents;
 
-    private final Map<ActivityId, Task[]> taskStateMap;
-
-    private TaskCluster[] taskClusters;
-
-    private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+    private ActivityClusterPlan acp;
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
 
@@ -48,8 +42,6 @@
         this.activities = activities;
         dependencies = new HashSet<ActivityCluster>();
         dependents = new HashSet<ActivityCluster>();
-        taskStateMap = new HashMap<ActivityId, Task[]>();
-        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
         inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
@@ -73,22 +65,6 @@
         return dependents;
     }
 
-    public Map<ActivityId, Task[]> getTaskMap() {
-        return taskStateMap;
-    }
-
-    public TaskCluster[] getTaskClusters() {
-        return taskClusters;
-    }
-
-    public void setTaskClusters(TaskCluster[] taskClusters) {
-        this.taskClusters = taskClusters;
-    }
-
-    public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
-        return partitionProducingTaskClusterMap;
-    }
-
     public JobRun getJobRun() {
         return jobRun;
     }
@@ -97,6 +73,14 @@
         return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
     }
 
+    public ActivityClusterPlan getPlan() {
+        return acp;
+    }
+
+    public void setPlan(ActivityClusterPlan acp) {
+        this.acp = acp;
+    }
+
     public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
         this.connectorPolicies = connectorPolicies;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
new file mode 100644
index 0000000..3f97651
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ActivityClusterPlan {
+    private final Map<ActivityId, Task[]> taskStateMap;
+
+    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+    private final TaskCluster[] taskClusters;
+
+    public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap,
+            Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap) {
+        this.taskStateMap = taskStateMap;
+        this.partitionProducingTaskClusterMap = partitionProducingTaskClusterMap;
+        this.taskClusters = taskClusters;
+    }
+
+    public Map<ActivityId, Task[]> getTaskMap() {
+        return taskStateMap;
+    }
+
+    public TaskCluster[] getTaskClusters() {
+        return taskClusters;
+    }
+
+    public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
+        return partitionProducingTaskClusterMap;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index d4a9283..7b02baf 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -21,6 +21,8 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -43,6 +45,8 @@
 
     private final Map<ActivityId, ActivityCluster> activityClusterMap;
 
+    private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
+
     private JobScheduler js;
 
     private JobStatus status;
@@ -56,6 +60,7 @@
         participatingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
+        connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
     }
 
     public UUID getJobId() {
@@ -121,4 +126,8 @@
     public void setActivityClusters(Set<ActivityCluster> activityClusters) {
         this.activityClusters = activityClusters;
     }
+
+    public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
+        return connectorPolicyMap;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 3ca75b8..28deb44 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 
@@ -22,11 +25,17 @@
 
     private final ActivityPartitionDetails apd;
 
+    private final Set<TaskId> dependencies;
+
+    private final Set<TaskId> dependents;
+
     private TaskCluster taskCluster;
 
     public Task(TaskId taskId, ActivityPartitionDetails apd) {
         this.taskId = taskId;
         this.apd = apd;
+        this.dependencies = new HashSet<TaskId>();
+        this.dependents = new HashSet<TaskId>();
     }
 
     public TaskId getTaskId() {
@@ -37,6 +46,14 @@
         return apd;
     }
 
+    public Set<TaskId> getDependencies() {
+        return dependencies;
+    }
+
+    public Set<TaskId> getDependents() {
+        return dependents;
+    }
+
     public TaskCluster getTaskCluster() {
         return taskCluster;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index ae2533e..d3de5a1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -31,13 +31,19 @@
 
     private final Set<PartitionId> requiredPartitions;
 
+    private final Set<TaskCluster> dependencyTaskClusters;
+
+    private final Set<TaskCluster> dependentTaskClusters;
+
     private final List<TaskClusterAttempt> taskClusterAttempts;
 
     public TaskCluster(ActivityCluster ac, Task[] tasks) {
         this.ac = ac;
         this.tasks = tasks;
-        this.producedPartitions = new HashSet<PartitionId>();
-        this.requiredPartitions = new HashSet<PartitionId>();
+        producedPartitions = new HashSet<PartitionId>();
+        requiredPartitions = new HashSet<PartitionId>();
+        dependencyTaskClusters = new HashSet<TaskCluster>();
+        dependentTaskClusters = new HashSet<TaskCluster>();
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
     }
 
@@ -57,6 +63,14 @@
         return requiredPartitions;
     }
 
+    public Set<TaskCluster> getDependencyTaskClusters() {
+        return dependencyTaskClusters;
+    }
+
+    public Set<TaskCluster> getDependentTaskClusters() {
+        return dependentTaskClusters;
+    }
+
     public List<TaskClusterAttempt> getAttempts() {
         return taskClusterAttempts;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 643cc18..c90b8ff 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -51,7 +51,7 @@
             Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
             ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
             if (ac != null) {
-                Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+                Map<ActivityId, Task[]> taskStateMap = ac.getPlan().getTaskMap();
                 Task[] taskStates = taskStateMap.get(tid.getActivityId());
                 if (taskStates != null && taskStates.length > tid.getPartition()) {
                     Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index 0d33c21..895135f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -31,7 +31,7 @@
     protected void performEvent(TaskAttempt ta) {
         try {
             ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
-            ac.getJobRun().getScheduler().notifyTaskComplete(ta, ac);
+            ac.getJobRun().getScheduler().notifyTaskComplete(ta);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
similarity index 72%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index bbcb83d..7231842 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -40,57 +40,75 @@
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 
-public class TaskClusterBuilder {
-    private static final Logger LOGGER = Logger.getLogger(TaskClusterBuilder.class.getName());
+public class ActivityClusterPlanner {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
 
-    private final JobRun jobRun;
-    private final PartitionConstraintSolver solver;
+    private final JobScheduler scheduler;
 
-    public TaskClusterBuilder(JobRun jobRun, PartitionConstraintSolver solver) {
-        this.jobRun = jobRun;
-        this.solver = solver;
+    public ActivityClusterPlanner(JobScheduler newJobScheduler) {
+        this.scheduler = newJobScheduler;
     }
 
-    public void buildTaskClusters(ActivityCluster ac) throws HyracksException {
+    public void planActivityCluster(ActivityCluster ac) throws HyracksException {
+        JobRun jobRun = scheduler.getJobRun();
         Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
-        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+        Map<ActivityId, Task[]> taskMap = new HashMap<ActivityId, Task[]>();
         Set<ActivityId> activities = ac.getActivities();
 
         Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
 
+        Set<ActivityId> depAnIds = new HashSet<ActivityId>();
         for (ActivityId anId : activities) {
+            depAnIds.clear();
+            getDependencyActivityIds(depAnIds, anId);
             ActivityPartitionDetails apd = pcMap.get(anId);
-            Task[] taskStates = new Task[apd.getPartitionCount()];
-            for (int i = 0; i < taskStates.length; ++i) {
+            Task[] tasks = new Task[apd.getPartitionCount()];
+            for (int i = 0; i < tasks.length; ++i) {
                 TaskId tid = new TaskId(anId, i);
-                taskStates[i] = new Task(tid, apd);
+                tasks[i] = new Task(tid, apd);
+                for (ActivityId danId : depAnIds) {
+                    ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
+                    ActivityClusterPlan dACP = dAC.getPlan();
+                    assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+                            + danId;
+                    Task[] dATasks = dACP.getTaskMap().get(danId);
+                    assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+                            + danId;
+                    assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
+                            + dATasks.length + " != " + tasks.length;
+                    Task dTask = dATasks[i];
+                    TaskId dTaskId = dTask.getTaskId();
+                    tasks[i].getDependencies().add(dTaskId);
+                    dTask.getDependents().add(tid);
+                }
                 Set<TaskId> cluster = new HashSet<TaskId>();
                 cluster.add(tid);
                 taskClusterMap.put(tid, cluster);
             }
-            taskStateMap.put(anId, taskStates);
+            taskMap.put(anId, tasks);
         }
 
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
-        ac.setConnectorPolicyMap(connectorPolicies);
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, taskMap, pcMap);
+        scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
 
         Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         BitSet targetBitmap = new BitSet();
         for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = taskStateMap.get(ac1);
+            Task[] ac1TaskStates = taskMap.get(ac1);
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
                     ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskStateMap.get(ac2);
+                    Task[] ac2TaskStates = taskMap.get(ac2);
                     int nConsumers = ac2TaskStates.length;
                     for (int i = 0; i < nProducers; ++i) {
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -146,24 +164,25 @@
         for (Set<TaskId> cluster : clusters) {
             Set<Task> taskStates = new HashSet<Task>();
             for (TaskId tid : cluster) {
-                taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
+                taskStates.add(taskMap.get(tid.getActivityId())[tid.getPartition()]);
             }
             TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
             tcSet.add(tc);
             for (TaskId tid : cluster) {
-                taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+                taskMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
             }
         }
-        ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
+        TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
 
-        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
-        for (TaskCluster tc : ac.getTaskClusters()) {
+        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+        for (TaskCluster tc : taskClusters) {
+            Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
             for (Task ts : tc.getTasks()) {
                 TaskId tid = ts.getTaskId();
                 List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
                 if (cInfoList != null) {
                     for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
-                        Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+                        Task targetTS = taskMap.get(p.first.getActivityId())[p.first.getPartition()];
                         TaskCluster targetTC = targetTS.getTaskCluster();
                         if (targetTC != tc) {
                             ConnectorDescriptorId cdId = p.second;
@@ -175,6 +194,12 @@
                         }
                     }
                 }
+
+                for (TaskId dTid : ts.getDependencies()) {
+                    TaskCluster dTC = getTaskCluster(dTid);
+                    dTC.getDependentTaskClusters().add(tc);
+                    tcDependencyTaskClusters.add(dTC);
+                }
             }
         }
 
@@ -185,24 +210,42 @@
                 LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
             }
         }
+
+        ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap, partitionProducingTaskClusterMap));
+    }
+
+    private TaskCluster getTaskCluster(TaskId tid) {
+        ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
+        ActivityClusterPlan acp = ac.getPlan();
+        Task[] tasks = acp.getTaskMap().get(tid.getActivityId());
+        Task task = tasks[tid.getPartition()];
+        assert task.getTaskId().equals(tid);
+        return task.getTaskCluster();
+    }
+
+    private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId) {
+        JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
+        Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(anId);
+        if (blockers != null) {
+            depAnIds.addAll(blockers);
+        }
     }
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityId, ActivityPartitionDetails> pcMap) {
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
+            Map<ActivityId, Task[]> taskMap, Map<ActivityId, ActivityPartitionDetails> pcMap) {
+        JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
         Set<ActivityId> activities = ac.getActivities();
-        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
         BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = taskStateMap.get(ac1);
+        for (ActivityId a1 : activities) {
+            Task[] ac1TaskStates = taskMap.get(a1);
             int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskStateMap.get(ac2);
+                    ActivityId a2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = taskMap.get(a2);
                     int nConsumers = ac2TaskStates.length;
 
                     int[] fanouts = new int[nProducers];
@@ -219,7 +262,7 @@
     }
 
     private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
-        IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+        IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph().getJobSpecification()
                 .getConnectorPolicyAssignmentPolicy();
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
@@ -229,6 +272,8 @@
 
     private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
             throws HyracksException {
+        PartitionConstraintSolver solver = scheduler.getSolver();
+        JobRun jobRun = scheduler.getJobRun();
         Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
         for (ActivityId anId : ac.getActivities()) {
             lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 3c19397..28a28ad 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -69,20 +69,28 @@
 
     private final JobRun jobRun;
 
-    private final Set<ActivityCluster> completedClusters;
-
-    private final Set<ActivityCluster> inProgressClusters;
-
     private final PartitionConstraintSolver solver;
 
+    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
+    private final Set<TaskCluster> inProgressTaskClusters;
+
     private Set<ActivityCluster> rootActivityClusters;
 
     public JobScheduler(ClusterControllerService ccs, JobRun jobRun) {
         this.ccs = ccs;
         this.jobRun = jobRun;
-        completedClusters = new HashSet<ActivityCluster>();
-        inProgressClusters = new HashSet<ActivityCluster>();
         solver = new PartitionConstraintSolver();
+        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+        inProgressTaskClusters = new HashSet<TaskCluster>();
+    }
+
+    public JobRun getJobRun() {
+        return jobRun;
+    }
+
+    public PartitionConstraintSolver getSolver() {
+        return solver;
     }
 
     public void startJob() throws HyracksException {
@@ -120,95 +128,165 @@
         rootActivityClusters = acgb.inferActivityClusters(jag);
     }
 
-    private void findPrerequisiteActivities(Set<ActivityId> prereqs, ActivityCluster ac) {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-        if (taskClusters == null) {
-            JobActivityGraph jag = jobRun.getJobActivityGraph();
-            for (ActivityId aid : ac.getActivities()) {
-                Set<ActivityId> deps = jag.getBlocked2BlockerMap().get(aid);
-                if (deps != null) {
-                    prereqs.addAll(deps);
-                }
-            }
-        } else {
-
-        }
-    }
-
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, Set<ActivityCluster> roots) {
+    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
+            throws HyracksException {
         for (ActivityCluster root : roots) {
-            findRunnableActivityClusters(frontier, root);
+            findRunnableTaskClusterRoots(frontier, root);
         }
     }
 
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
-        if (frontier.contains(candidate) || inProgressClusters.contains(candidate)
-                || completedClusters.contains(candidate)) {
-            return;
-        }
+    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, ActivityCluster candidate)
+            throws HyracksException {
         boolean depsComplete = true;
-        Set<ActivityId> prereqs = new HashSet<ActivityId>();
-        findPrerequisiteActivities(prereqs, candidate);
-        Map<ActivityCluster, Set<ActivityId>> prereqACMap = new HashMap<ActivityCluster, Set<ActivityId>>();
-        for (ActivityId aid : prereqs) {
-            ActivityCluster ac = jobRun.getActivityClusterMap().get(aid);
-            Set<ActivityId> pSet = prereqACMap.get(ac);
-            if (pSet == null) {
-                pSet = new HashSet<ActivityId>();
-                prereqACMap.put(ac, pSet);
-            }
-            pSet.add(aid);
-        }
         for (ActivityCluster depAC : candidate.getDependencies()) {
-            if (!completedClusters.contains(depAC)) {
+            if (!isPlanned(depAC)) {
                 depsComplete = false;
-                findRunnableActivityClusters(frontier, depAC);
+                findRunnableTaskClusterRoots(frontier, depAC);
             } else {
-                Set<ActivityId> pSet = prereqACMap.get(depAC);
-                if (pSet != null) {
-
+                boolean tcRootsComplete = true;
+                Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
+                for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
+                    if (tc.getProducedPartitions().isEmpty()) {
+                        if (findLastTaskClusterAttempt(tc).getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                            tcRootsComplete = false;
+                        }
+                        depACTCRoots.add(tc);
+                    }
                 }
-                // TODO
+                if (!tcRootsComplete) {
+                    depsComplete = false;
+                    findRunnableTaskClusterRoots(frontier, depAC);
+                }
             }
         }
         if (depsComplete) {
-            frontier.add(candidate);
-        }
-    }
-
-    private void startRunnableActivityClusters() throws HyracksException {
-        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
-        findRunnableActivityClusters(runnableClusters, rootActivityClusters);
-        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
-            return;
-        }
-        for (ActivityCluster ac : runnableClusters) {
-            inProgressClusters.add(ac);
-            TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
-            tcb.buildTaskClusters(ac);
-            startRunnableTaskClusters(ac);
-        }
-    }
-
-    private void abortActivityCluster(ActivityCluster ac) {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-        for (TaskCluster tc : taskClusters) {
-            List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-            if (!tcAttempts.isEmpty()) {
-                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
-                if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                    abortTaskCluster(tcAttempt, ac);
-                    tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+            if (!isPlanned(candidate)) {
+                ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
+                acp.planActivityCluster(candidate);
+            }
+            for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
+                if (tc.getProducedPartitions().isEmpty()) {
+                    TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+                    if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                        frontier.add(tc);
+                    }
                 }
             }
         }
-        inProgressClusters.remove(ac);
     }
 
-    private void assignTaskLocations(ActivityCluster ac, TaskCluster tc,
-            Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
-        JobRun jobRun = ac.getJobRun();
+    private boolean isPlanned(ActivityCluster ac) {
+        return ac.getPlan() != null;
+    }
+
+    private void startRunnableActivityClusters() throws HyracksException {
+        Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
+        findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+                    + inProgressTaskClusters);
+        }
+        if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+            return;
+        }
+        startRunnableTaskClusters(taskClusterRoots);
+    }
+
+    private void startRunnableTaskClusters(Set<TaskCluster> tcRoots) throws HyracksException {
+        Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
+        for (TaskCluster tc : tcRoots) {
+            assignRunnabilityRank(tc, runnabilityMap);
+        }
+
+        PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
+        for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
+            TaskCluster tc = e.getKey();
+            Runnability runnability = e.getValue();
+            assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
+            if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
+                continue;
+            }
+            int priority = runnability.getPriority();
+            if (priority >= 0 && priority < Integer.MAX_VALUE) {
+                queue.add(new RankedRunnableTaskCluster(priority, tc));
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Ranked TCs: " + queue);
+        }
+
+        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
+        for (RankedRunnableTaskCluster rrtc : queue) {
+            TaskCluster tc = rrtc.getTaskCluster();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Found runnable TC: " + tc);
+                List<TaskClusterAttempt> attempts = tc.getAttempts();
+                LOGGER.info("Attempts so far:" + attempts.size());
+                for (TaskClusterAttempt tcAttempt : attempts) {
+                    LOGGER.info("Status: " + tcAttempt.getStatus());
+                }
+            }
+            assignTaskLocations(tc, taskAttemptMap);
+        }
+
+        if (taskAttemptMap.isEmpty()) {
+            return;
+        }
+
+        startTasks(taskAttemptMap);
+    }
+
+    /*
+     * Runnability rank has the following semantics
+     * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
+     * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+     * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} 
+     */
+    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
+        if (runnabilityMap.containsKey(goal)) {
+            return runnabilityMap.get(goal);
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+        if (lastAttempt != null) {
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+        for (PartitionId pid : goal.getRequiredPartitions()) {
+            Runnability runnability;
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (PartitionState.COMMITTED.equals(maxState)) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+            } else {
+                runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
+                if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
+                        && cPolicy.consumerWaitsForProducerToFinish()) {
+                    runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                }
+            }
+            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+        }
+        runnabilityMap.put(goal, aggregateRunnability);
+        return aggregateRunnability;
+    }
+
+    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
+            throws HyracksException {
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         Task[] tasks = tc.getTasks();
         List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
@@ -289,13 +367,13 @@
         tcAttempt.initializePendingTaskCounter();
         tcAttempts.add(tcAttempt);
         tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
-        ac.getInProgressTaskClusters().add(tc);
+        tc.getActivityCluster().getInProgressTaskClusters().add(tc);
     }
 
     private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
         ActivityId blockerAID = tid.getActivityId();
         ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
-        Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
+        Task[] blockerTasks = blockerAC.getPlan().getTaskMap().get(blockerAID);
         List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
         if (tcAttempts == null || tcAttempts.isEmpty()) {
             return null;
@@ -318,152 +396,12 @@
         return null;
     }
 
-    public void notifyTaskComplete(TaskAttempt ta, ActivityCluster ac) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTaskState().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-            TaskAttempt.TaskStatus taStatus = ta.getStatus();
-            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-                if (lastAttempt.decrementPendingTasksCounter() == 0) {
-                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-                    ac.getInProgressTaskClusters().remove(tc);
-                    startRunnableTaskClusters(ac);
-                }
-            } else {
-                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
-            }
-        } else {
-            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
-        }
-    }
-
-    private void startRunnableTaskClusters(ActivityCluster ac) throws HyracksException {
-        Map<TaskCluster, Runnability> runnabilityMap = computeRunnabilityRanks(ac);
-
-        PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
-        for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
-            TaskCluster tc = e.getKey();
-            Runnability runnability = e.getValue();
-            assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
-            if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
-                continue;
-            }
-            int priority = runnability.getPriority();
-            if (priority >= 0 && priority < Integer.MAX_VALUE) {
-                queue.add(new RankedRunnableTaskCluster(priority, tc));
-            }
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ranked TCs: " + queue);
-        }
-
-        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (RankedRunnableTaskCluster rrtc : queue) {
-            TaskCluster tc = rrtc.getTaskCluster();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Found runnable TC: " + tc);
-                List<TaskClusterAttempt> attempts = tc.getAttempts();
-                LOGGER.info("Attempts so far:" + attempts.size());
-                for (TaskClusterAttempt tcAttempt : attempts) {
-                    LOGGER.info("Status: " + tcAttempt.getStatus());
-                }
-            }
-            assignTaskLocations(ac, tc, taskAttemptMap);
-        }
-
-        if (taskAttemptMap.isEmpty()) {
-            if (ac.getInProgressTaskClusters().isEmpty()) {
-                completedClusters.add(ac);
-                inProgressClusters.remove(ac);
-                startRunnableActivityClusters();
-            }
-            return;
-        }
-
-        startTasks(ac, taskAttemptMap);
-    }
-
-    private void abortJob(Exception exception) {
-        for (ActivityCluster ac : inProgressClusters) {
-            abortActivityCluster(ac);
-        }
-        jobRun.setStatus(JobStatus.FAILURE, exception);
-    }
-
-    private Map<TaskCluster, Runnability> computeRunnabilityRanks(ActivityCluster ac) {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-
-        Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
-        for (TaskCluster tc : taskClusters) {
-            // Start search at TCs that produce no outputs (sinks)
-            if (tc.getProducedPartitions().isEmpty()) {
-                assignRunnabilityRank(tc, runnabilityMap, ac);
-            }
-        }
-        return runnabilityMap;
-    }
-
-    /*
-     * Runnability rank has the following semantics
-     * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
-     * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
-     * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} 
-     */
-    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap,
-            ActivityCluster ac) {
-        if (runnabilityMap.containsKey(goal)) {
-            return runnabilityMap.get(goal);
-        }
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
-        if (lastAttempt != null) {
-            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
-                runnabilityMap.put(goal, runnability);
-                return runnability;
-            }
-            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
-                runnabilityMap.put(goal, runnability);
-                return runnability;
-            }
-        }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
-        JobRun jobRun = ac.getJobRun();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
-        for (PartitionId pid : goal.getRequiredPartitions()) {
-            Runnability runnability;
-            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
-            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
-            PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (PartitionState.COMMITTED.equals(maxState)) {
-                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
-            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
-                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
-            } else {
-                runnability = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityMap,
-                        ac);
-                if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
-                        && cPolicy.consumerWaitsForProducerToFinish()) {
-                    runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
-                }
-            }
-            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
-        }
-        runnabilityMap.put(goal, aggregateRunnability);
-        return aggregateRunnability;
-    }
-
-    private void startTasks(ActivityCluster ac, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
-            throws HyracksException {
+    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
         Executor executor = ccs.getExecutor();
-        JobRun jobRun = ac.getJobRun();
         final UUID jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
         for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
             String nodeId = e.getKey();
             final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
@@ -488,7 +426,14 @@
         }
     }
 
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt, ActivityCluster ac) {
+    private void abortJob(Exception exception) {
+        for (TaskCluster tc : inProgressTaskClusters) {
+            abortTaskCluster(findLastTaskClusterAttempt(tc));
+        }
+        jobRun.setStatus(JobStatus.FAILURE, exception);
+    }
+
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
         Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
@@ -504,7 +449,6 @@
                 abortTaskAttempts.add(taId);
             }
         }
-        JobRun jobRun = ac.getJobRun();
         final UUID jobId = jobRun.getJobId();
         for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
             final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
@@ -523,32 +467,30 @@
                 });
             }
         }
-        ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
+        inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
         TaskCluster tc = tcAttempt.getTaskCluster();
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
         pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
     }
 
-    private void abortDoomedTaskClusters(ActivityCluster ac) throws HyracksException {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-
+    private void abortDoomedTaskClusters() throws HyracksException {
         Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
-        for (TaskCluster tc : taskClusters) {
+        for (TaskCluster tc : inProgressTaskClusters) {
             // Start search at TCs that produce no outputs (sinks)
             if (tc.getProducedPartitions().isEmpty()) {
-                findDoomedTaskClusters(tc, ac, doomedTaskClusters);
+                findDoomedTaskClusters(tc, doomedTaskClusters);
             }
         }
 
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-            abortTaskCluster(tca, ac);
+            abortTaskCluster(tca);
             tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
     }
 
-    private boolean findDoomedTaskClusters(TaskCluster tc, ActivityCluster ac, Set<TaskCluster> doomedTaskClusters) {
+    private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
         if (doomedTaskClusters.contains(tc)) {
             return true;
         }
@@ -563,17 +505,21 @@
                     return false;
             }
         }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
-        JobRun jobRun = ac.getJobRun();
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         boolean doomed = false;
+        for (TaskCluster depTC : tc.getDependencyTaskClusters()) {
+            if (findDoomedTaskClusters(depTC, doomedTaskClusters)) {
+                doomed = true;
+            }
+        }
         for (PartitionId pid : tc.getRequiredPartitions()) {
             ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
             IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
             PartitionState maxState = pmm.getMaximumAvailableState(pid);
             if (maxState == null
                     || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
-                if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), ac, doomedTaskClusters)) {
+                if (findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
                     doomed = true;
                 }
             }
@@ -584,6 +530,27 @@
         return doomed;
     }
 
+    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
+        TaskAttemptId taId = ta.getTaskAttemptId();
+        TaskCluster tc = ta.getTaskState().getTaskCluster();
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
+            TaskAttempt.TaskStatus taStatus = ta.getStatus();
+            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
+                ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+                if (lastAttempt.decrementPendingTasksCounter() == 0) {
+                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                    inProgressTaskClusters.remove(tc);
+                    startRunnableActivityClusters();
+                }
+            } else {
+                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+            }
+        } else {
+            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+        }
+    }
+
     /**
      * Indicates that a single task attempt has encountered a failure.
      * 
@@ -603,14 +570,14 @@
                 TaskAttempt.TaskStatus taStatus = ta.getStatus();
                 if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
                     ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
-                    abortTaskCluster(lastAttempt, ac);
+                    abortTaskCluster(lastAttempt);
                     lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                    abortDoomedTaskClusters(ac);
+                    abortDoomedTaskClusters();
                     if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
-                        abortActivityCluster(ac);
+                        abortJob(null);
                         return;
                     }
-                    startRunnableTaskClusters(ac);
+                    startRunnableActivityClusters();
                 } else {
                     LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
                 }
@@ -633,7 +600,7 @@
         try {
             jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
             for (ActivityCluster ac : jobRun.getActivityClusters()) {
-                TaskCluster[] taskClusters = ac.getTaskClusters();
+                TaskCluster[] taskClusters = ac.getPlan().getTaskClusters();
                 if (taskClusters != null) {
                     for (TaskCluster tc : taskClusters) {
                         TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
@@ -646,18 +613,15 @@
                                 if (deadNodes.contains(ta.getNodeId())) {
                                     ta.setStatus(TaskAttempt.TaskStatus.FAILED,
                                             new HyracksException("Node " + ta.getNodeId() + " failed"));
-                                    TaskId tid = ta.getTaskAttemptId().getTaskId();
-                                    ActivityId aid = tid.getActivityId();
-
                                     abort = true;
                                 }
                             }
                             if (abort) {
-                                abortTaskCluster(lastTaskClusterAttempt, ac);
+                                abortTaskCluster(lastTaskClusterAttempt);
                             }
                         }
                     }
-                    abortDoomedTaskClusters(ac);
+                    abortDoomedTaskClusters();
                 }
             }
             startRunnableActivityClusters();