Cleaned up partition management. Made fault recovery more robust.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@465 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
similarity index 92%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
index ffd3722..a5017de 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
similarity index 90%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
index 0a02e58..82608c4 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class SendSideMaterializedReceiveSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 7de7ab8..79d709e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,7 +40,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -72,7 +71,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
@@ -296,13 +296,13 @@
     }
 
     @Override
-    public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception {
-        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId, state));
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
+        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, partitionDescriptor));
     }
 
     @Override
-    public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) {
-        jobQueue.schedule(new RegisterPartitionRequestEvent(this, pid, nodeId, minState));
+    public void registerPartitionRequest(PartitionRequest partitionRequest) {
+        jobQueue.schedule(new RegisterPartitionRequestEvent(this, partitionRequest));
     }
 
     private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index c802eb8..edc8579 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -117,4 +117,8 @@
     public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
         return connectorPolicies;
     }
+
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+        acsm.notifyNodeFailures(deadNodes);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index acf6750..40f4ae5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -24,9 +24,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
 import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
 public class JobRun implements IJobStatusConditionVariable {
@@ -34,9 +33,7 @@
 
     private final JobActivityGraph jag;
 
-    private final Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap;
-
-    private final Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap;
+    private final PartitionMatchMaker pmm;
 
     private final Set<String> participatingNodeIds;
 
@@ -53,8 +50,7 @@
     public JobRun(UUID jobId, JobActivityGraph plan) {
         this.jobId = jobId;
         this.jag = plan;
-        partitionAvailabilityMap = new HashMap<PartitionId, Map<String, PartitionState>>();
-        partitionRequestorMap = new HashMap<PartitionId, Map<String, PartitionState>>();
+        pmm = new PartitionMatchMaker();
         participatingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
@@ -68,6 +64,10 @@
         return jag;
     }
 
+    public PartitionMatchMaker getPartitionMatchMaker() {
+        return pmm;
+    }
+
     public synchronized void setStatus(JobStatus status, Exception exception) {
         this.status = status;
         this.exception = exception;
@@ -108,15 +108,11 @@
         return jsm;
     }
 
-    public Map<PartitionId, Map<String, PartitionState>> getPartitionAvailabilityMap() {
-        return partitionAvailabilityMap;
-    }
-
-    public Map<PartitionId, Map<String, PartitionState>> getPartitionRequestorMap() {
-        return partitionRequestorMap;
-    }
-
     public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
         return activityClusterMap;
     }
+    
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+        jsm.notifyNodeFailures(deadNodes);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index b82dafd..a0b2b43 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.job;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -31,10 +32,6 @@
 
     private final Set<PartitionId> requiredPartitions;
 
-    private final Set<TaskCluster> blockers;
-
-    private final Set<TaskCluster> dependencies;
-
     private final List<TaskClusterAttempt> taskClusterAttempts;
 
     public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
@@ -42,8 +39,6 @@
         this.tasks = tasks;
         this.producedPartitions = new HashSet<PartitionId>();
         this.requiredPartitions = new HashSet<PartitionId>();
-        this.blockers = new HashSet<TaskCluster>();
-        this.dependencies = new HashSet<TaskCluster>();
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
     }
 
@@ -51,10 +46,6 @@
         return tasks;
     }
 
-    public Set<TaskCluster> getDependencies() {
-        return dependencies;
-    }
-
     public Set<PartitionId> getProducedPartitions() {
         return producedPartitions;
     }
@@ -63,10 +54,6 @@
         return requiredPartitions;
     }
 
-    public Set<TaskCluster> getBlockers() {
-        return blockers;
-    }
-
     public List<TaskClusterAttempt> getAttempts() {
         return taskClusterAttempts;
     }
@@ -78,4 +65,9 @@
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
         activityCluster.getStateMachine().notifyTaskFailure(ta, exception);
     }
+
+    @Override
+    public String toString() {
+        return "TC:" + Arrays.toString(tasks);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index f59e9cc..d1829e9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
@@ -47,6 +48,10 @@
         int i = 0;
         for (String n : targetNodes) {
             jcns[i++] = new JobCompleteNotifier(n, jobId);
+            NodeControllerState ncs = ccs.getNodeMap().get(n);
+            if (ncs != null) {
+                ncs.getActiveJobIds().remove(jobId);
+            }
         }
         ccs.getExecutor().execute(new Runnable() {
             @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 8a35cf1..3f49b97 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -14,72 +14,44 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.List;
 
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionUtils;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 
 public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
-    private final PartitionId pid;
-    private final String nodeId;
-    private final PartitionState state;
+    private final PartitionDescriptor partitionDescriptor;
 
-    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
-            PartitionState state) {
+    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionDescriptor partitionDescriptor) {
         this.ccs = ccs;
-        this.pid = pid;
-        this.nodeId = nodeId;
-        this.state = state;
+        this.partitionDescriptor = partitionDescriptor;
     }
 
     @Override
     public void run() {
+        final PartitionId pid = partitionDescriptor.getPartitionId();
         JobRun run = ccs.getRunMap().get(pid.getJobId());
         if (run == null) {
             return;
         }
-        Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-        Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
-        if (paMap == null) {
-            paMap = new HashMap<String, PartitionState>();
-            partitionAvailabilityMap.put(pid, paMap);
-        }
-        paMap.put(nodeId, state);
-
-        NodeControllerState availNcs = ccs.getNodeMap().get(nodeId);
-        Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
-        Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
-        if (prMap != null) {
-            Iterator<Map.Entry<String, PartitionState>> i = prMap.entrySet().iterator();
-            while (i.hasNext()) {
-                Map.Entry<String, PartitionState> requestor = i.next();
-                PartitionState minState = requestor.getValue();
-                if (state.isAtLeast(minState)) {
-                    i.remove();
-                    NodeControllerState ncs = ccs.getNodeMap().get(requestor.getKey());
-                    if (ncs != null) {
-                        try {
-                            INodeController nc = ncs.getNodeController();
-                            nc.reportPartitionAvailability(pid, availNcs.getDataPort());
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
+        PartitionMatchMaker pmm = run.getPartitionMatchMaker();
+        List<Pair<PartitionDescriptor, PartitionRequest>> matches = pmm
+                .registerPartitionDescriptor(partitionDescriptor);
+        for (Pair<PartitionDescriptor, PartitionRequest> match : matches) {
+            PartitionUtils.reportPartitionMatch(ccs, pid, match);
         }
     }
 
     @Override
     public String toString() {
-        return "PartitionAvailable@" + nodeId + "[" + pid + "]" + state;
+        return "PartitionAvailable@" + partitionDescriptor;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index 2f2adc0..bac8dd1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -14,77 +14,41 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionUtils;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 
 public class RegisterPartitionRequestEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
-    private final PartitionId pid;
-    private final String nodeId;
-    private final PartitionState minState;
+    private final PartitionRequest partitionRequest;
 
-    public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
-            PartitionState minState) {
+    public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionRequest partitionRequest) {
         this.ccs = ccs;
-        this.pid = pid;
-        this.nodeId = nodeId;
-        this.minState = minState;
+        this.partitionRequest = partitionRequest;
     }
 
     @Override
     public void run() {
-        NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
-        if (ncs != null) {
-            JobRun run = ccs.getRunMap().get(pid.getJobId());
-            if (run == null) {
-                return;
-            }
-
-            Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-            Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
-            boolean matched = false;
-            if (paMap != null && !paMap.isEmpty()) {
-                for (Map.Entry<String, PartitionState> pa : paMap.entrySet()) {
-                    PartitionState state = pa.getValue();
-                    if (state.isAtLeast(minState)) {
-                        NodeControllerState availNcs = ccs.getNodeMap().get(pa.getKey());
-                        if (availNcs != null) {
-                            NetworkAddress networkAddress = availNcs.getDataPort();
-                            try {
-                                INodeController nc = ncs.getNodeController();
-                                nc.reportPartitionAvailability(pid, networkAddress);
-                                matched = true;
-                                break;
-                            } catch (Exception e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    }
-                }
-            }
-            if (!matched) {
-                Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
-                Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
-                if (prMap == null) {
-                    prMap = new HashMap<String, PartitionState>();
-                    partitionRequestorMap.put(pid, prMap);
-                }
-                prMap.put(nodeId, minState);
-            }
+        PartitionId pid = partitionRequest.getPartitionId();
+        JobRun run = ccs.getRunMap().get(pid.getJobId());
+        if (run == null) {
+            return;
+        }
+        PartitionMatchMaker pmm = run.getPartitionMatchMaker();
+        Pair<PartitionDescriptor, PartitionRequest> match = pmm.matchPartitionRequest(partitionRequest);
+        if (match != null) {
+            PartitionUtils.reportPartitionMatch(ccs, pid, match);
         }
     }
 
     @Override
     public String toString() {
-        return "PartitionRequest@[" + nodeId + "][" + pid + "]" + minState;
+        return "PartitionRequest@" + partitionRequest;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 6e000cc..27322f5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -17,11 +17,14 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 
 public class RemoveDeadNodesEvent extends AbstractEvent {
@@ -44,10 +47,14 @@
                 LOGGER.info(e.getKey() + " considered dead");
             }
         }
+        Set<UUID> affectedJobIds = new HashSet<UUID>();
         Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
+
             // Deal with dead tasks.
+            affectedJobIds.addAll(state.getActiveJobIds());
+
             String ipAddress = state.getNCConfig().dataIPAddress;
             Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
             if (ipNodes != null) {
@@ -56,6 +63,19 @@
                 }
             }
         }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Number of affected jobs: " + affectedJobIds.size());
+        }
+        for (UUID jobId : affectedJobIds) {
+            JobRun run = ccs.getRunMap().get(jobId);
+            if (run != null) {
+                try {
+                    run.notifyNodeFailures(deadNodes);
+                } catch (HyracksException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
new file mode 100644
index 0000000..1bfb69e
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -0,0 +1,187 @@
+package edu.uci.ics.hyracks.control.cc.partitions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+
+public class PartitionMatchMaker {
+    private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors;
+
+    private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
+
+    public PartitionMatchMaker() {
+        partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
+        partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+    }
+
+    public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
+            PartitionDescriptor partitionDescriptor) {
+        List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+        PartitionId pid = partitionDescriptor.getPartitionId();
+        boolean matched = false;
+        List<PartitionRequest> requests = partitionRequests.get(pid);
+        if (requests != null) {
+            Iterator<PartitionRequest> i = requests.iterator();
+            while (i.hasNext()) {
+                PartitionRequest req = i.next();
+                if (partitionDescriptor.getState().isAtLeast(req.getMinimumState())) {
+                    matches.add(new Pair<PartitionDescriptor, PartitionRequest>(partitionDescriptor, req));
+                    i.remove();
+                    matched = true;
+                    if (!partitionDescriptor.isReusable()) {
+                        break;
+                    }
+                }
+            }
+            if (requests.isEmpty()) {
+                partitionRequests.remove(pid);
+            }
+        }
+
+        if (!matched) {
+            List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+            if (descriptors == null) {
+                descriptors = new ArrayList<PartitionDescriptor>();
+                partitionDescriptors.put(pid, descriptors);
+            }
+            descriptors.add(partitionDescriptor);
+        }
+
+        return matches;
+    }
+
+    public Pair<PartitionDescriptor, PartitionRequest> matchPartitionRequest(PartitionRequest partitionRequest) {
+        Pair<PartitionDescriptor, PartitionRequest> match = null;
+
+        PartitionId pid = partitionRequest.getPartitionId();
+
+        List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+        if (descriptors != null) {
+            Iterator<PartitionDescriptor> i = descriptors.iterator();
+            while (i.hasNext()) {
+                PartitionDescriptor descriptor = i.next();
+                if (descriptor.getState().isAtLeast(partitionRequest.getMinimumState())) {
+                    match = new Pair<PartitionDescriptor, PartitionRequest>(descriptor, partitionRequest);
+                    if (!descriptor.isReusable()) {
+                        i.remove();
+                    }
+                    break;
+                }
+            }
+            if (descriptors.isEmpty()) {
+                partitionDescriptors.remove(pid);
+            }
+        }
+
+        if (match == null) {
+            List<PartitionRequest> requests = partitionRequests.get(pid);
+            if (requests == null) {
+                requests = new ArrayList<PartitionRequest>();
+                partitionRequests.put(pid, requests);
+            }
+            requests.add(partitionRequest);
+        }
+
+        return match;
+    }
+
+    public PartitionState getMaximumAvailableState(PartitionId pid) {
+        List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+        if (descriptors == null) {
+            return null;
+        }
+        for (PartitionDescriptor descriptor : descriptors) {
+            if (descriptor.getState() == PartitionState.COMMITTED) {
+                return PartitionState.COMMITTED;
+            }
+        }
+        return PartitionState.STARTED;
+    }
+
+    private interface IEntryFilter<T> {
+        public boolean matches(T o);
+    }
+
+    private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
+        Iterator<T> j = list.iterator();
+        while (j.hasNext()) {
+            T o = j.next();
+            if (filter.matches(o)) {
+                j.remove();
+            }
+        }
+    }
+
+    private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
+        Iterator<Map.Entry<PartitionId, List<T>>> i = map.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<PartitionId, List<T>> e = i.next();
+            List<T> list = e.getValue();
+            removeEntries(list, filter);
+            if (list.isEmpty()) {
+                i.remove();
+            }
+        }
+    }
+
+    public void notifyNodeFailures(final Set<String> deadNodes) {
+        removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
+            @Override
+            public boolean matches(PartitionDescriptor o) {
+                return deadNodes.contains(o.getNodeId());
+            }
+        });
+        removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
+            @Override
+            public boolean matches(PartitionRequest o) {
+                return deadNodes.contains(o.getNodeId());
+            }
+        });
+    }
+
+    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+        IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
+            @Override
+            public boolean matches(PartitionDescriptor o) {
+                return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
+            }
+        };
+        for (PartitionId pid : partitionIds) {
+            List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+            if (descriptors != null) {
+                removeEntries(descriptors, filter);
+                if (descriptors.isEmpty()) {
+                    partitionDescriptors.remove(pid);
+                }
+            }
+        }
+    }
+
+    public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+        IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
+            @Override
+            public boolean matches(PartitionRequest o) {
+                return taIds.contains(o.getRequestingTaskAttemptId());
+            }
+        };
+        for (PartitionId pid : partitionIds) {
+            List<PartitionRequest> requests = partitionRequests.get(pid);
+            if (requests != null) {
+                removeEntries(requests, filter);
+                if (requests.isEmpty()) {
+                    partitionRequests.remove(pid);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java
new file mode 100644
index 0000000..90c6ad1
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.cc.partitions;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+
+public class PartitionUtils {
+    public static void reportPartitionMatch(ClusterControllerService ccs, final PartitionId pid,
+            Pair<PartitionDescriptor, PartitionRequest> match) {
+        PartitionDescriptor desc = match.first;
+        PartitionRequest req = match.second;
+
+        NodeControllerState producerNCS = ccs.getNodeMap().get(desc.getNodeId());
+        NodeControllerState requestorNCS = ccs.getNodeMap().get(req.getNodeId());
+        final NetworkAddress dataport = producerNCS.getDataPort();
+        final INodeController requestorNC = requestorNCS.getNodeController();
+        ccs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    requestorNC.reportPartitionAvailability(pid, dataport);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index 15c2db7..8d2040e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -16,11 +16,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -37,6 +38,7 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -46,6 +48,8 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
@@ -57,6 +61,8 @@
 
     private final ActivityCluster ac;
 
+    private final PriorityQueue<RankedRunnableTaskCluster> runnableQueue;
+
     private final Set<TaskCluster> inProgressTaskClusters;
 
     public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
@@ -64,6 +70,14 @@
         this.ccs = ccs;
         this.jsm = jsm;
         this.ac = ac;
+        runnableQueue = new PriorityQueue<RankedRunnableTaskCluster>(ac.getTaskClusters().length,
+                new Comparator<RankedRunnableTaskCluster>() {
+                    @Override
+                    public int compare(RankedRunnableTaskCluster o1, RankedRunnableTaskCluster o2) {
+                        int cmp = o1.getRank() - o2.getRank();
+                        return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+                    }
+                });
         inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
@@ -177,12 +191,12 @@
     }
 
     private void startRunnableTaskClusters() throws HyracksException {
-        Set<TaskCluster> runnableTaskClusters = new HashSet<TaskCluster>();
-        findRunnableTaskClusters(runnableTaskClusters);
+        findRunnableTaskClusters();
         Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (TaskCluster tc : runnableTaskClusters) {
+        for (RankedRunnableTaskCluster rrtc : runnableQueue) {
+            TaskCluster tc = rrtc.getTaskCluster();
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+                LOGGER.info("Found runnable TC: " + tc);
                 List<TaskClusterAttempt> attempts = tc.getAttempts();
                 LOGGER.info("Attempts so far:" + attempts.size());
                 for (TaskClusterAttempt tcAttempt : attempts) {
@@ -202,60 +216,74 @@
         startTasks(taskAttemptMap);
     }
 
-    private void findRunnableTaskClusters(Set<TaskCluster> runnableTaskClusters) {
+    private void findRunnableTaskClusters() {
         TaskCluster[] taskClusters = ac.getTaskClusters();
 
+        Map<TaskCluster, Integer> runnabilityRanks = new HashMap<TaskCluster, Integer>();
         for (TaskCluster tc : taskClusters) {
-            Set<TaskCluster> blockers = tc.getBlockers();
-            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-            if (lastAttempt != null
-                    && (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastAttempt
-                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                continue;
+            // Start search at TCs that produce no outputs (sinks)
+            if (tc.getProducedPartitions().isEmpty()) {
+                assignRunnabilityRank(tc, runnabilityRanks);
             }
-            boolean runnable = true;
-            for (TaskCluster blockerTC : blockers) {
-                List<TaskClusterAttempt> tcAttempts = blockerTC.getAttempts();
-                if (tcAttempts.isEmpty()) {
-                    runnable = false;
-                    break;
-                }
-                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
-                if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                    runnable = false;
-                    break;
-                }
+        }
+
+        runnableQueue.clear();
+        for (Map.Entry<TaskCluster, Integer> e : runnabilityRanks.entrySet()) {
+            TaskCluster tc = e.getKey();
+            int rank = e.getValue();
+            if (rank >= 0 && rank < Integer.MAX_VALUE) {
+                runnableQueue.add(new RankedRunnableTaskCluster(rank, tc));
             }
-            if (runnable) {
-                runnableTaskClusters.add(tc);
-            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Ranked TCs: " + runnableQueue);
         }
     }
 
-    private void findCascadingAbortTaskClusterAttempts(TaskClusterAttempt abortedTCAttempt,
-            Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts) {
-        boolean changed = true;
-        cascadingAbortTaskClusterAttempts.add(abortedTCAttempt);
-        while (changed) {
-            changed = false;
-            for (TaskCluster tc : ac.getTaskClusters()) {
-                TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-                if (tca != null && tca.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                    boolean abort = false;
-                    for (TaskClusterAttempt catca : cascadingAbortTaskClusterAttempts) {
-                        TaskCluster catc = catca.getTaskCluster();
-                        if (tc.getDependencies().contains(catc)) {
-                            abort = true;
-                            break;
-                        }
-                    }
-                    if (abort) {
-                        changed = cascadingAbortTaskClusterAttempts.add(tca) || changed;
-                    }
-                }
+    /*
+     * Runnability rank has the following semantics
+     * Rank(Completed TaskCluster) = -2
+     * Rank(Running TaskCluster) = -1
+     * Rank(Runnable TaskCluster depending on completed TaskClusters) = 0
+     * Rank(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+     * Rank(Non-schedulable TaskCluster) = MAX_VALUE 
+     */
+    private int assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Integer> runnabilityRank) {
+        if (runnabilityRank.containsKey(goal)) {
+            return runnabilityRank.get(goal);
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+        if (lastAttempt != null) {
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+                    || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                int rank = -1;
+                runnabilityRank.put(goal, rank);
+                return rank;
             }
         }
-        cascadingAbortTaskClusterAttempts.remove(abortedTCAttempt);
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+        JobRun jobRun = ac.getJobRun();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        int maxInputRank = -1;
+        for (PartitionId pid : goal.getRequiredPartitions()) {
+            int rank = -1;
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (PartitionState.COMMITTED.equals(maxState)
+                    || (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish())) {
+                rank = -1;
+            } else {
+                rank = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityRank);
+                if (rank >= 0 && cPolicy.consumerWaitsForProducerToFinish()) {
+                    rank = Integer.MAX_VALUE;
+                }
+            }
+            maxInputRank = Math.max(maxInputRank, rank);
+        }
+        int rank = maxInputRank < Integer.MAX_VALUE ? maxInputRank + 1 : Integer.MAX_VALUE;
+        runnabilityRank.put(goal, rank);
+        return rank;
     }
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
@@ -270,6 +298,7 @@
             final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
+                node.getActiveJobIds().add(jobRun.getJobId());
                 jobRun.getParticipatingNodeIds().add(nodeId);
                 executor.execute(new Runnable() {
                     @Override
@@ -289,16 +318,19 @@
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt) throws HyracksException {
+        Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
-        for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
-            if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
-                ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
-                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
+        for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            abortTaskIds.add(taId);
+            if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
+                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
                 if (abortTaskAttempts == null) {
                     abortTaskAttempts = new ArrayList<TaskAttemptId>();
-                    abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
+                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
                 }
-                abortTaskAttempts.add(ta2.getTaskAttemptId());
+                abortTaskAttempts.add(taId);
             }
         }
         JobRun jobRun = ac.getJobRun();
@@ -320,6 +352,10 @@
                 });
             }
         }
+        TaskCluster tc = tcAttempt.getTaskCluster();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
+        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
     }
 
     @Override
@@ -333,21 +369,70 @@
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
                 abortTaskCluster(lastAttempt);
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts = new HashSet<TaskClusterAttempt>();
-                findCascadingAbortTaskClusterAttempts(lastAttempt, cascadingAbortTaskClusterAttempts);
-                for (TaskClusterAttempt tca : cascadingAbortTaskClusterAttempts) {
-                    abortTaskCluster(tca);
-                    tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
-                }
+                abortDoomedTaskClusters();
                 ac.notifyTaskClusterFailure(lastAttempt, exception);
             } else {
-                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+                LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
             }
         } else {
-            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+            LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
         }
     }
 
+    private void abortDoomedTaskClusters() throws HyracksException {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+
+        Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
+        for (TaskCluster tc : taskClusters) {
+            // Start search at TCs that produce no outputs (sinks)
+            if (tc.getProducedPartitions().isEmpty()) {
+                findDoomedTaskClusters(tc, doomedTaskClusters);
+            }
+        }
+
+        for (TaskCluster tc : doomedTaskClusters) {
+            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+            abortTaskCluster(tca);
+            tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+        }
+    }
+
+    private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
+        if (doomedTaskClusters.contains(tc)) {
+            return true;
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null) {
+            switch (lastAttempt.getStatus()) {
+                case ABORTED:
+                case FAILED:
+                    return true;
+
+                case COMPLETED:
+                    return false;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+        JobRun jobRun = ac.getJobRun();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        boolean doomed = false;
+        for (PartitionId pid : tc.getRequiredPartitions()) {
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (maxState == null
+                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
+                if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), doomedTaskClusters)) {
+                    doomed = true;
+                }
+            }
+        }
+        if (doomed) {
+            doomedTaskClusters.add(tc);
+        }
+        return doomed;
+    }
+
     @Override
     public void abort() throws HyracksException {
         TaskCluster[] taskClusters = ac.getTaskClusters();
@@ -375,4 +460,28 @@
         assignTaskLocations(tc, taskAttemptMap);
         startTasks(taskAttemptMap);
     }
+
+    @Override
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+        for (TaskCluster tc : ac.getTaskClusters()) {
+            TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+            if (lastTaskClusterAttempt != null
+                    && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                boolean abort = false;
+                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
+                    if (deadNodes.contains(ta.getNodeId())) {
+                        ta.setStatus(TaskAttempt.TaskStatus.FAILED, new HyracksException("Node " + ta.getNodeId()
+                                + " failed"));
+                        abort = true;
+                    }
+                }
+                if (abort) {
+                    abortTaskCluster(lastTaskClusterAttempt);
+                }
+            }
+        }
+        abortDoomedTaskClusters();
+        startRunnableTaskClusters();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 12215ff..16b7f5c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -205,7 +205,7 @@
         }
     }
 
-    private void findRunnableClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
         if (completedClusters.contains(candidate) || frontier.contains(candidate)
                 || inProgressClusters.contains(candidate)) {
             return;
@@ -214,7 +214,7 @@
         for (ActivityCluster s : candidate.getDependencies()) {
             if (!completedClusters.contains(s)) {
                 runnable = false;
-                findRunnableClusters(frontier, s);
+                findRunnableActivityClusters(frontier, s);
             }
         }
         if (runnable && candidate != rootActivityCluster) {
@@ -222,8 +222,8 @@
         }
     }
 
-    private void findRunnableClusters(Set<ActivityCluster> frontier) {
-        findRunnableClusters(frontier, rootActivityCluster);
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
+        findRunnableActivityClusters(frontier, rootActivityCluster);
     }
 
     @Override
@@ -266,7 +266,7 @@
 
     private void startRunnableActivityClusters() throws HyracksException {
         Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
-        findRunnableClusters(runnableClusters);
+        findRunnableActivityClusters(runnableClusters);
         if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
             ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
             return;
@@ -448,20 +448,12 @@
                             tc.getProducedPartitions().add(pid);
                             targetTC.getRequiredPartitions().add(pid);
                             partitionProducingTaskClusterMap.put(pid, tc);
-                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
-                            targetTC.getDependencies().add(tc);
-                            if (cPolicy.consumerWaitsForProducerToFinish()) {
-                                targetTC.getBlockers().add(tc);
-                            }
                         }
                     }
                 }
             }
         }
 
-        computeBlockerClosure(tcSet);
-        computeDependencyClosure(tcSet);
-
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Plan for " + ac);
             LOGGER.info("Built " + tcSet.size() + " Task Clusters");
@@ -511,48 +503,6 @@
         return new PipelinedConnectorPolicy();
     }
 
-    private void computeDependencyClosure(Set<TaskCluster> tcSet) {
-        boolean done = false;
-        while (!done) {
-            done = true;
-            for (TaskCluster tc : tcSet) {
-                Set<TaskCluster> deps = tc.getDependencies();
-                if (!deps.isEmpty()) {
-                    Set<TaskCluster> copy = new HashSet<TaskCluster>(deps);
-                    for (TaskCluster tc2 : copy) {
-                        for (TaskCluster tc3 : tc2.getDependencies()) {
-                            if (!deps.contains(tc3)) {
-                                deps.add(tc3);
-                                done = false;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    private void computeBlockerClosure(Set<TaskCluster> tcSet) {
-        boolean done = false;
-        while (!done) {
-            done = true;
-            for (TaskCluster tc : tcSet) {
-                Set<TaskCluster> blockers = tc.getBlockers();
-                if (!blockers.isEmpty()) {
-                    Set<TaskCluster> copy = new HashSet<TaskCluster>(blockers);
-                    for (TaskCluster tc2 : copy) {
-                        for (TaskCluster tc3 : tc2.getBlockers()) {
-                            if (!blockers.contains(tc3)) {
-                                blockers.add(tc3);
-                                done = false;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     @Override
     public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
         for (ActivityCluster ac2 : inProgressClusters) {
@@ -571,4 +521,14 @@
         inProgressClusters.remove(ac);
         startRunnableActivityClusters();
     }
+
+    @Override
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+        jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+        if (!inProgressClusters.isEmpty()) {
+            for (ActivityCluster ac : inProgressClusters) {
+                ac.notifyNodeFailures(deadNodes);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
index 02adfa5..c6d85dd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
+import java.util.Set;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
@@ -28,4 +30,6 @@
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
 
     public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
+
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
index 97ca9fe..bbf23d5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
+import java.util.Set;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 
@@ -23,4 +25,6 @@
     public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
 
     public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
+
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
new file mode 100644
index 0000000..fa11643
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+
+public class RankedRunnableTaskCluster {
+    private final int rank;
+
+    private final TaskCluster taskCluster;
+
+    public RankedRunnableTaskCluster(int rank, TaskCluster taskCluster) {
+        this.rank = rank;
+        this.taskCluster = taskCluster;
+    }
+
+    public int getRank() {
+        return rank;
+    }
+
+    public TaskCluster getTaskCluster() {
+        return taskCluster;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + rank + ":" + taskCluster + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 8dbb180..8dae67d 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -19,10 +19,10 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
@@ -40,7 +40,7 @@
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
-    public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception;
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
 
-    public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) throws Exception;
+    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
new file mode 100644
index 0000000..63b7ad3
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PartitionId pid;
+
+    private final String nodeId;
+
+    private final TaskAttemptId producingTaskAttemptId;
+
+    private final boolean reusable;
+
+    private PartitionState state;
+
+    public PartitionDescriptor(PartitionId pid, String nodeId, TaskAttemptId producingTaskAttemptId, boolean reusable) {
+        this.pid = pid;
+        this.nodeId = nodeId;
+        this.producingTaskAttemptId = producingTaskAttemptId;
+        this.reusable = reusable;
+    }
+
+    public PartitionId getPartitionId() {
+        return pid;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public TaskAttemptId getProducingTaskAttemptId() {
+        return producingTaskAttemptId;
+    }
+
+    public PartitionState getState() {
+        return state;
+    }
+
+    public void setState(PartitionState state) {
+        this.state = state;
+    }
+
+    public boolean isReusable() {
+        return reusable;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable")
+                + state + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
new file mode 100644
index 0000000..ca34501
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionRequest implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PartitionId pid;
+
+    private final String requestingNodeId;
+
+    private final TaskAttemptId requestingTaskAttemptId;
+
+    private final PartitionState minState;
+
+    public PartitionRequest(PartitionId pid, String requestingNodeId, TaskAttemptId requestingTaskAttemptId,
+            PartitionState minState) {
+        this.pid = pid;
+        this.requestingNodeId = requestingNodeId;
+        this.requestingTaskAttemptId = requestingTaskAttemptId;
+        this.minState = minState;
+    }
+
+    public PartitionId getPartitionId() {
+        return pid;
+    }
+
+    public String getNodeId() {
+        return requestingNodeId;
+    }
+
+    public TaskAttemptId getRequestingTaskAttemptId() {
+        return requestingTaskAttemptId;
+    }
+
+    public PartitionState getMinimumState() {
+        return minState;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + pid + ":" + requestingNodeId + ":" + requestingTaskAttemptId + ":" + minState + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
index 412cbac..edde0b8 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.control.common.job;
 
 public enum PartitionState {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 9c12cd9..8ef0f3e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -213,11 +214,12 @@
         return counter;
     }
 
-    public synchronized void advertisePartitionRequest(Collection<PartitionId> pids, IPartitionCollector collector,
-            PartitionState minState) throws Exception {
+    public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
+            IPartitionCollector collector, PartitionState minState) throws Exception {
         for (PartitionId pid : pids) {
             partitionRequestMap.put(pid, collector);
-            nodeController.getClusterController().registerPartitionRequest(pid, nodeController.getId(), minState);
+            PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
+            nodeController.getClusterController().registerPartitionRequest(req);
         }
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a26988f..3223907 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -286,7 +286,7 @@
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
                         IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
-                                partition);
+                                partition, taId);
 
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
@@ -314,20 +314,21 @@
         IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
                 td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, executor);
+            return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, task.getTaskAttemptId(),
+                    executor);
         } else {
             return collector;
         }
     }
 
     private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
-            final IConnectorDescriptor conn, final int senderIndex) {
+            final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
         if (cPolicy.materializeOnSendSide()) {
             return new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                     return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
-                            conn.getConnectorId(), senderIndex, receiverIndex), executor);
+                            conn.getConnectorId(), senderIndex, receiverIndex), taId, executor);
                 }
             };
         } else {
@@ -335,7 +336,7 @@
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                     return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
-                            senderIndex, receiverIndex));
+                            senderIndex, receiverIndex), taId);
                 }
             };
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 20bf9da..e787a97 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -207,7 +207,7 @@
         try {
             collector.open();
             try {
-                joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
+                joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 353ba32..db2e405 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -19,11 +19,13 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 
 public class MaterializedPartitionWriter implements IFrameWriter {
@@ -33,6 +35,8 @@
 
     protected final PartitionId pid;
 
+    protected final TaskAttemptId taId;
+
     protected final Executor executor;
 
     private FileReference fRef;
@@ -42,10 +46,11 @@
     private long size;
 
     public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
-            Executor executor) {
+            TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
         this.manager = manager;
         this.pid = pid;
+        this.taId = taId;
         this.executor = executor;
     }
 
@@ -69,7 +74,8 @@
     @Override
     public void close() throws HyracksDataException {
         ctx.getIOManager().close(handle);
-        manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
-        manager.notifyPartitionCommit(pid);
+        manager.registerPartition(pid, taId,
+                new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+                PartitionState.COMMITTED);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index b676e33..63ae790 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -22,11 +22,13 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -49,7 +51,8 @@
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
     }
 
-    public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
+    public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+            throws HyracksDataException {
         synchronized (this) {
             List<IPartition> pList = partitionMap.get(pid);
             if (pList == null) {
@@ -58,16 +61,15 @@
             }
             pList.add(partition);
         }
-        try {
-            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.STARTED);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
+        updatePartitionState(pid, taId, partition, state);
     }
 
-    public void notifyPartitionCommit(PartitionId pid) throws HyracksDataException {
+    public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+            throws HyracksDataException {
+        PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+        desc.setState(state);
         try {
-            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.COMMITTED);
+            ncs.getClusterController().registerPartitionProvider(desc);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index ec6e25d..1a9fb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -17,20 +17,25 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 
 public class PipelinedPartition implements IFrameWriter, IPartition {
     private final PartitionManager manager;
 
     private final PartitionId pid;
 
+    private final TaskAttemptId taId;
+
     private IFrameWriter delegate;
 
-    public PipelinedPartition(PartitionManager manager, PartitionId pid) {
+    public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
         this.manager = manager;
         this.pid = pid;
+        this.taId = taId;
     }
 
     @Override
@@ -51,7 +56,7 @@
 
     @Override
     public synchronized void open() throws HyracksDataException {
-        manager.registerPartition(pid, this);
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
         while (delegate == null) {
             try {
                 wait();
@@ -74,7 +79,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        manager.notifyPartitionCommit(pid);
+        manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
         delegate.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index 5f3a3f4..6b7eb4e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
@@ -37,13 +38,16 @@
 
     private final IPartitionCollector delegate;
 
+    private final TaskAttemptId taId;
+
     private final Executor executor;
 
     public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
-            IPartitionCollector collector, Executor executor) {
+            IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
         this.manager = manager;
         this.delegate = collector;
+        this.taId = taId;
         this.executor = executor;
     }
 
@@ -103,7 +107,7 @@
         @Override
         public synchronized void run() {
             PartitionId pid = pc.getPartitionId();
-            MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, executor);
+            MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, taId, executor);
             IInputChannel channel = pc.getInputChannel();
             try {
                 channel.registerMonitor(this);