Refactored cluster controller

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@492 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index 8584d36..7609951 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
@@ -25,4 +27,12 @@
     public TaskAttemptId getTaskAttemptId();
 
     public ICounterContext getCounterContext();
+
+    public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException;
+
+    public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException;
+
+    public void setVariable(String name, Object value) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java
new file mode 100644
index 0000000..7310606
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.naming;
+
+import java.util.Arrays;
+
+public final class MultipartName {
+    private Object[] parts;
+
+    public MultipartName(Object... parts) {
+        this.parts = parts;
+    }
+
+    public Object[] getParts() {
+        return parts;
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.deepHashCode(parts);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MultipartName)) {
+            return false;
+        }
+        return Arrays.deepEquals(parts, ((MultipartName) o).parts);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
index 9adf482..5d42ce9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
@@ -31,7 +31,7 @@
 
     @Override
     public String toString() {
-        return "TID: " + id;
+        return "ThID: " + id;
     }
 
     public int hashCode() {
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index 1451c69..1185b5e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -34,6 +34,9 @@
     }
 
     public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+        if (bytes == null) {
+            return null;
+        }
         ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(classLoader);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java
new file mode 100644
index 0000000..b13afe4
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.workflow.variables;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.naming.MultipartName;
+
+public final class WorkflowVariableDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final MultipartName name;
+
+    private final WorkflowVariableValueScope scope;
+
+    public WorkflowVariableDescriptor(MultipartName name, WorkflowVariableValueScope scope) {
+        this.name = name;
+        this.scope = scope;
+    }
+
+    public MultipartName getName() {
+        return name;
+    }
+
+    public WorkflowVariableValueScope getScope() {
+        return scope;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
similarity index 70%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
index d359731..44ed129 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
@@ -12,11 +12,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.scheduler;
+package edu.uci.ics.hyracks.api.workflow.variables;
 
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-
-public interface IJobScheduler {
-    public void notifyJobCreation(JobRun run) throws HyracksException;
+public enum WorkflowVariableValueScope {
+    LOCAL,
+    GLOBAL,
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 79d709e..e0c3ee1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,7 @@
 import edu.uci.ics.hyracks.control.cc.job.manager.events.UnregisterNodeEvent;
 import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.jobqueue.JobQueue;
-import edu.uci.ics.hyracks.control.cc.scheduler.DefaultJobScheduler;
-import edu.uci.ics.hyracks.control.cc.scheduler.IJobScheduler;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -100,7 +99,7 @@
 
     private final JobQueue jobQueue;
 
-    private final IJobScheduler scheduler;
+    private final JobScheduler scheduler;
 
     private final Executor taskExecutor;
 
@@ -121,7 +120,7 @@
         webServer = new WebServer(this);
         runMap = new HashMap<UUID, JobRun>();
         jobQueue = new JobQueue();
-        scheduler = new DefaultJobScheduler(this);
+        scheduler = new JobScheduler(this);
         this.timer = new Timer(true);
         ccci = new CCClientInterface(this);
         ccContext = new ICCContext() {
@@ -165,7 +164,7 @@
         return jobQueue;
     }
 
-    public IJobScheduler getScheduler() {
+    public JobScheduler getScheduler() {
         return scheduler;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index edc8579..be4945d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -22,9 +22,8 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityClusterStateMachine;
 
 public class ActivityCluster {
     private final JobRun jobRun;
@@ -41,10 +40,12 @@
 
     private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
 
-    private IActivityClusterStateMachine acsm;
+    private ActivityClusterStateMachine acsm;
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
 
+    private Set<TaskCluster> inProgressTaskClusters;
+
     public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
         this.jobRun = jobRun;
         this.activities = activities;
@@ -52,6 +53,7 @@
         dependents = new HashSet<ActivityCluster>();
         taskStateMap = new HashMap<ActivityId, Task[]>();
         partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+        inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
     public Set<ActivityId> getActivities() {
@@ -86,11 +88,11 @@
         return partitionProducingTaskClusterMap;
     }
 
-    public IActivityClusterStateMachine getStateMachine() {
+    public ActivityClusterStateMachine getStateMachine() {
         return acsm;
     }
 
-    public void setStateMachine(IActivityClusterStateMachine acsm) {
+    public void setStateMachine(ActivityClusterStateMachine acsm) {
         this.acsm = acsm;
     }
 
@@ -102,14 +104,6 @@
         return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
     }
 
-    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
-        acsm.notifyTaskClusterFailure(tcAttempt, exception);
-    }
-
-    public void notifyActivityClusterComplete() throws HyracksException {
-        jobRun.getStateMachine().notifyActivityClusterComplete(this);
-    }
-
     public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
         this.connectorPolicies = connectorPolicies;
     }
@@ -118,7 +112,7 @@
         return connectorPolicies;
     }
 
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
-        acsm.notifyNodeFailures(deadNodes);
+    public Set<TaskCluster> getInProgressTaskClusters() {
+        return inProgressTaskClusters;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 40f4ae5..b16b4c1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
-import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobRunStateMachine;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
 public class JobRun implements IJobStatusConditionVariable {
@@ -41,7 +41,7 @@
 
     private final Map<ActivityId, ActivityCluster> activityClusterMap;
 
-    private IJobRunStateMachine jsm;
+    private JobRunStateMachine jsm;
 
     private JobStatus status;
 
@@ -100,18 +100,18 @@
         return profile;
     }
 
-    public void setStateMachine(IJobRunStateMachine jsm) {
+    public void setStateMachine(JobRunStateMachine jsm) {
         this.jsm = jsm;
     }
 
-    public IJobRunStateMachine getStateMachine() {
+    public JobRunStateMachine getStateMachine() {
         return jsm;
     }
 
     public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
         return activityClusterMap;
     }
-    
+
     public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
         jsm.notifyNodeFailures(deadNodes);
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
index 1bfb69e..d2c14b2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.control.cc.partitions;
 
 import java.util.ArrayList;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..704ab51
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+
+public class ActivityClusterGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+    private final JobRun jobRun;
+
+    public ActivityClusterGraphBuilder(JobRun jobRun) {
+        this.jobRun = jobRun;
+    }
+
+    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
+            Set<ActivityCluster> eqSets) {
+        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
+        for (ActivityCluster eqSet : eqSets) {
+            for (ActivityId t : eqSet.getActivities()) {
+                IActivity activity = activityNodeMap.get(t);
+                List<Integer> inputList = jag.getActivityInputMap().get(t);
+                if (inputList != null) {
+                    for (Integer idx : inputList) {
+                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
+                                .getOperatorDescriptorId(), idx);
+                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
+                        ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+                        if (!eqSet.getActivities().contains(inTask)) {
+                            return new Pair<ActivityId, ActivityId>(t, inTask);
+                        }
+                    }
+                }
+                List<Integer> outputList = jag.getActivityOutputMap().get(t);
+                if (outputList != null) {
+                    for (Integer idx : outputList) {
+                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
+                                .getOperatorDescriptorId(), idx);
+                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
+                        ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+                        if (!eqSet.getActivities().contains(outTask)) {
+                            return new Pair<ActivityId, ActivityId>(t, outTask);
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public ActivityCluster inferStages(JobActivityGraph jag) {
+        JobSpecification spec = jag.getJobSpecification();
+
+        /*
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+         */
+        Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
+        Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
+        for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
+            for (ActivityId taskId : taskIds) {
+                Set<ActivityId> eqSet = new HashSet<ActivityId>();
+                eqSet.add(taskId);
+                ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
+                stageMap.put(taskId, stage);
+                stages.add(stage);
+            }
+        }
+
+        boolean changed = true;
+        while (changed) {
+            changed = false;
+            Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
+            if (pair != null) {
+                merge(stageMap, stages, pair.first, pair.second);
+                changed = true;
+            }
+        }
+
+        ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
+        Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
+        for (ActivityCluster s : stages) {
+            endStage.addDependency(s);
+            s.addDependent(endStage);
+            Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
+            for (ActivityId t : s.getActivities()) {
+                Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
+                if (blockedTasks != null) {
+                    for (ActivityId bt : blockedTasks) {
+                        blockedStages.add(stageMap.get(bt));
+                    }
+                }
+            }
+            for (ActivityCluster bs : blockedStages) {
+                bs.addDependency(s);
+                s.addDependent(bs);
+            }
+        }
+        jobRun.getActivityClusterMap().putAll(stageMap);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+            for (ActivityCluster s : stages) {
+                LOGGER.info(s.toString());
+            }
+            LOGGER.info("SID: ENDSTAGE");
+        }
+        return endStage;
+    }
+
+    private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
+            ActivityId t2) {
+        ActivityCluster stage1 = eqSetMap.get(t1);
+        Set<ActivityId> s1 = stage1.getActivities();
+        ActivityCluster stage2 = eqSetMap.get(t2);
+        Set<ActivityId> s2 = stage2.getActivities();
+
+        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+        mergedSet.addAll(s1);
+        mergedSet.addAll(s2);
+
+        eqSets.remove(stage1);
+        eqSets.remove(stage2);
+        ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
+        eqSets.add(mergedStage);
+
+        for (ActivityId t : mergedSet) {
+            eqSetMap.put(t, mergedStage);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
similarity index 85%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
index 8d2040e..47eeb06 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -31,8 +30,8 @@
 
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -52,42 +51,29 @@
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
-public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
-    private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
+public class ActivityClusterStateMachine {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterStateMachine.class.getName());
 
     private final ClusterControllerService ccs;
 
-    private final DefaultJobRunStateMachine jsm;
+    private final JobRunStateMachine jsm;
 
     private final ActivityCluster ac;
 
-    private final PriorityQueue<RankedRunnableTaskCluster> runnableQueue;
-
-    private final Set<TaskCluster> inProgressTaskClusters;
-
-    public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
-            ActivityCluster ac) {
+    public ActivityClusterStateMachine(ClusterControllerService ccs, JobRunStateMachine jsm, ActivityCluster ac) {
         this.ccs = ccs;
         this.jsm = jsm;
         this.ac = ac;
-        runnableQueue = new PriorityQueue<RankedRunnableTaskCluster>(ac.getTaskClusters().length,
-                new Comparator<RankedRunnableTaskCluster>() {
-                    @Override
-                    public int compare(RankedRunnableTaskCluster o1, RankedRunnableTaskCluster o2) {
-                        int cmp = o1.getRank() - o2.getRank();
-                        return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
-                    }
-                });
-        inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
-    @Override
     public void schedule() throws HyracksException {
         startRunnableTaskClusters();
     }
 
     private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
             throws HyracksException {
+        JobRun jobRun = ac.getJobRun();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
         Task[] tasks = tc.getTasks();
         List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
         int attempts = tcAttempts.size();
@@ -107,25 +93,31 @@
         tcAttempt.setTaskAttempts(taskAttempts);
         PartitionConstraintSolver solver = jsm.getSolver();
         solver.solve(locationMap.values());
-        Map<OperatorDescriptorId, String> operatorLocationAssignmentMap = jsm.getOperatorLocationAssignmentMap();
         for (int i = 0; i < tasks.length; ++i) {
             Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
             TaskAttempt taskAttempt = taskAttempts[i];
-            String nodeId = operatorLocationAssignmentMap.get(tid.getActivityId().getOperatorDescriptorId());
+            ActivityId aid = tid.getActivityId();
+            Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
+            String nodeId = null;
+            if (blockers != null) {
+                for (ActivityId blocker : blockers) {
+                    nodeId = findLocationOfBlocker(jobRun, jag, new TaskId(blocker, tid.getPartition()));
+                    if (nodeId != null) {
+                        break;
+                    }
+                }
+            }
+            Set<String> liveNodes = ccs.getNodeMap().keySet();
             if (nodeId == null) {
                 LValueConstraintExpression pLocationExpr = locationMap.get(tid);
                 Object location = solver.getValue(pLocationExpr);
-                Set<String> liveNodes = ccs.getNodeMap().keySet();
                 if (location == null) {
                     // pick any
                     nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
                             % liveNodes.size()];
                 } else if (location instanceof String) {
                     nodeId = (String) location;
-                    if (!liveNodes.contains(nodeId)) {
-                        throw new HyracksException("Node " + nodeId + " not live");
-                    }
                 } else if (location instanceof String[]) {
                     for (String choice : (String[]) location) {
                         if (liveNodes.contains(choice)) {
@@ -141,7 +133,12 @@
                     throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
                             + location.getClass() + ")");
                 }
-                operatorLocationAssignmentMap.put(tid.getActivityId().getOperatorDescriptorId(), nodeId);
+            }
+            if (nodeId == null) {
+                throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
+            }
+            if (!liveNodes.contains(nodeId)) {
+                throw new HyracksException("Node " + nodeId + " not live");
             }
             taskAttempt.setNodeId(nodeId);
             taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
@@ -157,7 +154,25 @@
         tcAttempt.initializePendingTaskCounter();
         tcAttempts.add(tcAttempt);
         tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
-        inProgressTaskClusters.add(tc);
+        ac.getInProgressTaskClusters().add(tc);
+    }
+
+    private String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
+        ActivityId blockerAID = tid.getActivityId();
+        ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
+        Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
+        List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
+        if (tcAttempts == null || tcAttempts.isEmpty()) {
+            return null;
+        }
+        TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
+        for (TaskAttempt ta : lastTCA.getTaskAttempts()) {
+            TaskId blockerTID = ta.getTaskAttemptId().getTaskId();
+            if (tid.equals(blockerTID)) {
+                return ta.getNodeId();
+            }
+        }
+        return null;
     }
 
     private TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
@@ -168,7 +183,6 @@
         return null;
     }
 
-    @Override
     public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
         TaskAttemptId taId = ta.getTaskAttemptId();
         TaskCluster tc = ta.getTaskState().getTaskCluster();
@@ -179,7 +193,7 @@
                 ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
                 if (lastAttempt.decrementPendingTasksCounter() == 0) {
                     lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-                    inProgressTaskClusters.remove(tc);
+                    ac.getInProgressTaskClusters().remove(tc);
                     startRunnableTaskClusters();
                 }
             } else {
@@ -191,9 +205,9 @@
     }
 
     private void startRunnableTaskClusters() throws HyracksException {
-        findRunnableTaskClusters();
+        PriorityQueue<RankedRunnableTaskCluster> queue = findRunnableTaskClusters();
         Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (RankedRunnableTaskCluster rrtc : runnableQueue) {
+        for (RankedRunnableTaskCluster rrtc : queue) {
             TaskCluster tc = rrtc.getTaskCluster();
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Found runnable TC: " + tc);
@@ -203,12 +217,18 @@
                     LOGGER.info("Status: " + tcAttempt.getStatus());
                 }
             }
-            assignTaskLocations(tc, taskAttemptMap);
+            try {
+                assignTaskLocations(tc, taskAttemptMap);
+            } catch (HyracksException e) {
+                abort();
+                ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
+                return;
+            }
         }
 
         if (taskAttemptMap.isEmpty()) {
-            if (inProgressTaskClusters.isEmpty()) {
-                ac.notifyActivityClusterComplete();
+            if (ac.getInProgressTaskClusters().isEmpty()) {
+                jsm.notifyActivityClusterComplete(ac);
             }
             return;
         }
@@ -216,7 +236,7 @@
         startTasks(taskAttemptMap);
     }
 
-    private void findRunnableTaskClusters() {
+    private PriorityQueue<RankedRunnableTaskCluster> findRunnableTaskClusters() {
         TaskCluster[] taskClusters = ac.getTaskClusters();
 
         Map<TaskCluster, Integer> runnabilityRanks = new HashMap<TaskCluster, Integer>();
@@ -227,23 +247,23 @@
             }
         }
 
-        runnableQueue.clear();
+        PriorityQueue<RankedRunnableTaskCluster> result = new PriorityQueue<RankedRunnableTaskCluster>();
         for (Map.Entry<TaskCluster, Integer> e : runnabilityRanks.entrySet()) {
             TaskCluster tc = e.getKey();
             int rank = e.getValue();
             if (rank >= 0 && rank < Integer.MAX_VALUE) {
-                runnableQueue.add(new RankedRunnableTaskCluster(rank, tc));
+                result.add(new RankedRunnableTaskCluster(rank, tc));
             }
         }
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ranked TCs: " + runnableQueue);
+            LOGGER.info("Ranked TCs: " + result);
         }
+        return result;
     }
 
     /*
      * Runnability rank has the following semantics
-     * Rank(Completed TaskCluster) = -2
-     * Rank(Running TaskCluster) = -1
+     * Rank(Running TaskCluster || Completed TaskCluster) = -1
      * Rank(Runnable TaskCluster depending on completed TaskClusters) = 0
      * Rank(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
      * Rank(Non-schedulable TaskCluster) = MAX_VALUE 
@@ -305,7 +325,7 @@
                     public void run() {
                         try {
                             node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors, connectorPolicies);
+                                    taskDescriptors, connectorPolicies, null);
                         } catch (IOException e) {
                             e.printStackTrace();
                         } catch (Exception e) {
@@ -352,13 +372,13 @@
                 });
             }
         }
+        ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
         TaskCluster tc = tcAttempt.getTaskCluster();
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
         pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
     }
 
-    @Override
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
         TaskAttemptId taId = ta.getTaskAttemptId();
         TaskCluster tc = ta.getTaskState().getTaskCluster();
@@ -370,7 +390,7 @@
                 abortTaskCluster(lastAttempt);
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
-                ac.notifyTaskClusterFailure(lastAttempt, exception);
+                notifyTaskClusterFailure(lastAttempt, exception);
             } else {
                 LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
             }
@@ -433,7 +453,6 @@
         return doomed;
     }
 
-    @Override
     public void abort() throws HyracksException {
         TaskCluster[] taskClusters = ac.getTaskClusters();
         for (TaskCluster tc : taskClusters) {
@@ -448,7 +467,6 @@
         }
     }
 
-    @Override
     public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
         TaskCluster tc = tcAttempt.getTaskCluster();
         if (tcAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
@@ -457,11 +475,16 @@
             return;
         }
         Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        assignTaskLocations(tc, taskAttemptMap);
+        try {
+            assignTaskLocations(tc, taskAttemptMap);
+        } catch (HyracksException e) {
+            abort();
+            ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
+            return;
+        }
         startTasks(taskAttemptMap);
     }
 
-    @Override
     public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
         for (TaskCluster tc : ac.getTaskClusters()) {
             TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
@@ -482,6 +505,9 @@
             }
         }
         abortDoomedTaskClusters();
-        startRunnableTaskClusters();
+    }
+
+    public boolean canMakeProgress() {
+        return true;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
deleted file mode 100644
index 16b7f5c..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.api.util.Pair;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.job.Task;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
-
-public class DefaultJobRunStateMachine implements IJobRunStateMachine {
-    private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
-
-    private final ClusterControllerService ccs;
-
-    private final JobRun jobRun;
-
-    private final Map<OperatorDescriptorId, String> operatorLocationAssignmentMap;
-
-    private final Set<ActivityCluster> completedClusters;
-
-    private final Set<ActivityCluster> inProgressClusters;
-
-    private PartitionConstraintSolver solver;
-
-    private ActivityCluster rootActivityCluster;
-
-    public DefaultJobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
-        this.ccs = ccs;
-        this.jobRun = jobRun;
-        this.operatorLocationAssignmentMap = new HashMap<OperatorDescriptorId, String>();
-        completedClusters = new HashSet<ActivityCluster>();
-        inProgressClusters = new HashSet<ActivityCluster>();
-    }
-
-    public Map<OperatorDescriptorId, String> getOperatorLocationAssignmentMap() {
-        return operatorLocationAssignmentMap;
-    }
-
-    public PartitionConstraintSolver getSolver() {
-        return solver;
-    }
-
-    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
-            Set<ActivityCluster> eqSets) {
-        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
-        for (ActivityCluster eqSet : eqSets) {
-            for (ActivityId t : eqSet.getActivities()) {
-                IActivity activity = activityNodeMap.get(t);
-                List<Integer> inputList = jag.getActivityInputMap().get(t);
-                if (inputList != null) {
-                    for (Integer idx : inputList) {
-                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
-                                .getOperatorDescriptorId(), idx);
-                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
-                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
-                        ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
-                        if (!eqSet.getActivities().contains(inTask)) {
-                            return new Pair<ActivityId, ActivityId>(t, inTask);
-                        }
-                    }
-                }
-                List<Integer> outputList = jag.getActivityOutputMap().get(t);
-                if (outputList != null) {
-                    for (Integer idx : outputList) {
-                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
-                                .getOperatorDescriptorId(), idx);
-                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
-                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
-                        ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
-                        if (!eqSet.getActivities().contains(outTask)) {
-                            return new Pair<ActivityId, ActivityId>(t, outTask);
-                        }
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    private ActivityCluster inferStages(JobActivityGraph jag) {
-        JobSpecification spec = jag.getJobSpecification();
-
-        /*
-         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
-         */
-        Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
-        Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
-        for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
-            for (ActivityId taskId : taskIds) {
-                Set<ActivityId> eqSet = new HashSet<ActivityId>();
-                eqSet.add(taskId);
-                ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
-                stageMap.put(taskId, stage);
-                stages.add(stage);
-            }
-        }
-
-        boolean changed = true;
-        while (changed) {
-            changed = false;
-            Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
-            if (pair != null) {
-                merge(stageMap, stages, pair.first, pair.second);
-                changed = true;
-            }
-        }
-
-        ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
-        Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
-        for (ActivityCluster s : stages) {
-            endStage.addDependency(s);
-            s.addDependent(endStage);
-            Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
-            for (ActivityId t : s.getActivities()) {
-                Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
-                if (blockedTasks != null) {
-                    for (ActivityId bt : blockedTasks) {
-                        blockedStages.add(stageMap.get(bt));
-                    }
-                }
-            }
-            for (ActivityCluster bs : blockedStages) {
-                bs.addDependency(s);
-                s.addDependent(bs);
-            }
-        }
-        jobRun.getActivityClusterMap().putAll(stageMap);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
-            for (ActivityCluster s : stages) {
-                LOGGER.info(s.toString());
-            }
-            LOGGER.info("SID: ENDSTAGE");
-        }
-        return endStage;
-    }
-
-    private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
-            ActivityId t2) {
-        ActivityCluster stage1 = eqSetMap.get(t1);
-        Set<ActivityId> s1 = stage1.getActivities();
-        ActivityCluster stage2 = eqSetMap.get(t2);
-        Set<ActivityId> s2 = stage2.getActivities();
-
-        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
-        mergedSet.addAll(s1);
-        mergedSet.addAll(s2);
-
-        eqSets.remove(stage1);
-        eqSets.remove(stage2);
-        ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
-        eqSets.add(mergedStage);
-
-        for (ActivityId t : mergedSet) {
-            eqSetMap.put(t, mergedStage);
-        }
-    }
-
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
-        if (completedClusters.contains(candidate) || frontier.contains(candidate)
-                || inProgressClusters.contains(candidate)) {
-            return;
-        }
-        boolean runnable = true;
-        for (ActivityCluster s : candidate.getDependencies()) {
-            if (!completedClusters.contains(s)) {
-                runnable = false;
-                findRunnableActivityClusters(frontier, s);
-            }
-        }
-        if (runnable && candidate != rootActivityCluster) {
-            frontier.add(candidate);
-        }
-    }
-
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
-        findRunnableActivityClusters(frontier, rootActivityCluster);
-    }
-
-    @Override
-    public void schedule() throws HyracksException {
-        try {
-            solver = new PartitionConstraintSolver();
-            final JobActivityGraph jag = jobRun.getJobActivityGraph();
-            final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
-            JobSpecification spec = jag.getJobSpecification();
-            final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
-            final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
-                @Override
-                public void addConstraint(Constraint constraint) {
-                    contributedConstraints.add(constraint);
-                }
-            };
-            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-                @Override
-                public void visit(IOperatorDescriptor op) {
-                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
-                @Override
-                public void visit(IConnectorDescriptor conn) {
-                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            contributedConstraints.addAll(spec.getUserConstraints());
-            solver.addConstraints(contributedConstraints);
-
-            rootActivityCluster = inferStages(jag);
-            startRunnableActivityClusters();
-        } catch (Exception e) {
-            e.printStackTrace();
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
-            throw new HyracksException(e);
-        }
-    }
-
-    private void startRunnableActivityClusters() throws HyracksException {
-        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
-        findRunnableActivityClusters(runnableClusters);
-        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
-            return;
-        }
-        for (ActivityCluster ac : runnableClusters) {
-            inProgressClusters.add(ac);
-            buildTaskClusters(ac);
-            IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
-            ac.setStateMachine(acsm);
-            acsm.schedule();
-        }
-    }
-
-    private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
-            throws HyracksException {
-        Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
-        for (ActivityId anId : ac.getActivities()) {
-            lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
-        }
-        solver.solve(lValues);
-        Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
-        for (LValueConstraintExpression lv : lValues) {
-            Object value = solver.getValue(lv);
-            if (value == null) {
-                throw new HyracksException("No value found for " + lv);
-            }
-            if (!(value instanceof Number)) {
-                throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
-                        + value + ")");
-            }
-            int nParts = ((Number) value).intValue();
-            if (nParts <= 0) {
-                throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
-            }
-            nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
-        }
-        Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
-        for (ActivityId anId : ac.getActivities()) {
-            int nParts = nPartMap.get(anId.getOperatorDescriptorId());
-            int[] nInputPartitions = null;
-            List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
-            if (inputs != null) {
-                nInputPartitions = new int[inputs.size()];
-                for (int i = 0; i < nInputPartitions.length; ++i) {
-                    nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
-                            .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
-                }
-            }
-            int[] nOutputPartitions = null;
-            List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
-                    anId);
-            if (outputs != null) {
-                nOutputPartitions = new int[outputs.size()];
-                for (int i = 0; i < nOutputPartitions.length; ++i) {
-                    nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
-                            .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
-                }
-            }
-            ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
-            activityPartsMap.put(anId, apd);
-        }
-        return activityPartsMap;
-    }
-
-    private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
-        Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
-
-        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
-        Set<ActivityId> activities = ac.getActivities();
-
-        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
-        for (ActivityId anId : activities) {
-            ActivityPartitionDetails apd = pcMap.get(anId);
-            Task[] taskStates = new Task[apd.getPartitionCount()];
-            for (int i = 0; i < taskStates.length; ++i) {
-                TaskId tid = new TaskId(anId, i);
-                taskStates[i] = new Task(tid, apd);
-                Set<TaskId> cluster = new HashSet<TaskId>();
-                cluster.add(tid);
-                taskClusterMap.put(tid, cluster);
-            }
-            taskStateMap.put(anId, taskStates);
-        }
-
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
-        ac.setConnectorPolicyMap(connectorPolicies);
-
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = taskStateMap.get(ac1);
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskStateMap.get(ac2);
-                    int nConsumers = ac2TaskStates.length;
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                .getTaskId());
-                        if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                        Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
-                            cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
-                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
-                            if (cPolicy.requiresProducerConsumerCoscheduling()) {
-                                cluster.add(ac2TaskStates[j].getTaskId());
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        boolean done = false;
-        while (!done) {
-            done = true;
-            Set<TaskId> set = new HashSet<TaskId>();
-            Set<TaskId> oldSet = null;
-            for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
-                set.clear();
-                oldSet = e.getValue();
-                set.addAll(e.getValue());
-                for (TaskId tid : e.getValue()) {
-                    set.addAll(taskClusterMap.get(tid));
-                }
-                for (TaskId tid : set) {
-                    Set<TaskId> targetSet = taskClusterMap.get(tid);
-                    if (!targetSet.equals(set)) {
-                        done = false;
-                        break;
-                    }
-                }
-                if (!done) {
-                    break;
-                }
-            }
-            for (TaskId tid : oldSet) {
-                taskClusterMap.put(tid, set);
-            }
-        }
-
-        Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
-        Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
-        for (Set<TaskId> cluster : clusters) {
-            Set<Task> taskStates = new HashSet<Task>();
-            for (TaskId tid : cluster) {
-                taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
-            }
-            TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
-            tcSet.add(tc);
-            for (TaskId tid : cluster) {
-                taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
-            }
-        }
-        ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
-
-        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
-        for (TaskCluster tc : ac.getTaskClusters()) {
-            for (Task ts : tc.getTasks()) {
-                TaskId tid = ts.getTaskId();
-                List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
-                if (cInfoList != null) {
-                    for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
-                        Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
-                        TaskCluster targetTC = targetTS.getTaskCluster();
-                        if (targetTC != tc) {
-                            ConnectorDescriptorId cdId = p.second;
-                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
-                                    p.first.getPartition());
-                            tc.getProducedPartitions().add(pid);
-                            targetTC.getRequiredPartitions().add(pid);
-                            partitionProducingTaskClusterMap.put(pid, tc);
-                        }
-                    }
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Plan for " + ac);
-            LOGGER.info("Built " + tcSet.size() + " Task Clusters");
-            for (TaskCluster tc : tcSet) {
-                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
-            }
-        }
-    }
-
-    private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityId, ActivityPartitionDetails> pcMap) {
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
-        Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        Set<ActivityId> activities = ac.getActivities();
-        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = taskStateMap.get(ac1);
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskStateMap.get(ac2);
-                    int nConsumers = ac2TaskStates.length;
-
-                    int[] fanouts = new int[nProducers];
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        fanouts[i] = targetBitmap.cardinality();
-                    }
-                    IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
-                    cPolicyMap.put(cdId, cp);
-                }
-            }
-        }
-        return cPolicyMap;
-    }
-
-    private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
-        IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
-                .getConnectorPolicyAssignmentPolicy();
-        if (cpap != null) {
-            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
-        }
-        return new PipelinedConnectorPolicy();
-    }
-
-    @Override
-    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
-        for (ActivityCluster ac2 : inProgressClusters) {
-            abortActivityCluster(ac2);
-        }
-        jobRun.setStatus(JobStatus.FAILURE, exception);
-    }
-
-    private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
-        ac.getStateMachine().abort();
-    }
-
-    @Override
-    public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
-        completedClusters.add(ac);
-        inProgressClusters.remove(ac);
-        startRunnableActivityClusters();
-    }
-
-    @Override
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
-        jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
-        if (!inProgressClusters.isEmpty()) {
-            for (ActivityCluster ac : inProgressClusters) {
-                ac.notifyNodeFailures(deadNodes);
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
deleted file mode 100644
index c6d85dd..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-
-public interface IActivityClusterStateMachine {
-    public void schedule() throws HyracksException;
-
-    public void abort() throws HyracksException;
-
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException;
-
-    public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
-
-    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
-
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
deleted file mode 100644
index bbf23d5..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-
-public interface IJobRunStateMachine {
-    public void schedule() throws HyracksException;
-
-    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
-
-    public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
-
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
new file mode 100644
index 0000000..ce2770a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
+
+public class JobRunStateMachine {
+    private final ClusterControllerService ccs;
+
+    private final JobRun jobRun;
+
+    private final Set<ActivityCluster> completedClusters;
+
+    private final Set<ActivityCluster> inProgressClusters;
+
+    private PartitionConstraintSolver solver;
+
+    private ActivityCluster rootActivityCluster;
+
+    public JobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
+        this.ccs = ccs;
+        this.jobRun = jobRun;
+        completedClusters = new HashSet<ActivityCluster>();
+        inProgressClusters = new HashSet<ActivityCluster>();
+    }
+
+    public PartitionConstraintSolver getSolver() {
+        return solver;
+    }
+
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
+        if (completedClusters.contains(candidate) || frontier.contains(candidate)
+                || inProgressClusters.contains(candidate)) {
+            return;
+        }
+        boolean runnable = true;
+        for (ActivityCluster s : candidate.getDependencies()) {
+            if (!completedClusters.contains(s)) {
+                runnable = false;
+                findRunnableActivityClusters(frontier, s);
+            }
+        }
+        if (runnable && candidate != rootActivityCluster) {
+            frontier.add(candidate);
+        }
+    }
+
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
+        findRunnableActivityClusters(frontier, rootActivityCluster);
+    }
+
+    public void schedule() throws HyracksException {
+        try {
+            solver = new PartitionConstraintSolver();
+            final JobActivityGraph jag = jobRun.getJobActivityGraph();
+            final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
+            JobSpecification spec = jag.getJobSpecification();
+            final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
+            final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+                @Override
+                public void addConstraint(Constraint constraint) {
+                    contributedConstraints.add(constraint);
+                }
+            };
+            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+                @Override
+                public void visit(IOperatorDescriptor op) {
+                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
+                }
+            });
+            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+                @Override
+                public void visit(IConnectorDescriptor conn) {
+                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
+                }
+            });
+            contributedConstraints.addAll(spec.getUserConstraints());
+            solver.addConstraints(contributedConstraints);
+
+            ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
+            rootActivityCluster = acgb.inferStages(jag);
+            startRunnableActivityClusters();
+        } catch (Exception e) {
+            e.printStackTrace();
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
+            throw new HyracksException(e);
+        }
+    }
+
+    private void startRunnableActivityClusters() throws HyracksException {
+        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
+        findRunnableActivityClusters(runnableClusters);
+        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+            return;
+        }
+        for (ActivityCluster ac : runnableClusters) {
+            inProgressClusters.add(ac);
+            TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
+            tcb.buildTaskClusters(ac);
+            ActivityClusterStateMachine acsm = new ActivityClusterStateMachine(ccs, this, ac);
+            ac.setStateMachine(acsm);
+            acsm.schedule();
+        }
+    }
+
+    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
+        for (ActivityCluster ac2 : inProgressClusters) {
+            abortActivityCluster(ac2);
+        }
+        jobRun.setStatus(JobStatus.FAILURE, exception);
+    }
+
+    private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
+        ac.getStateMachine().abort();
+    }
+
+    public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
+        completedClusters.add(ac);
+        inProgressClusters.remove(ac);
+        startRunnableActivityClusters();
+    }
+
+    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+        jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+        for (ActivityCluster ac : completedClusters) {
+            ac.getStateMachine().notifyNodeFailures(deadNodes);
+        }
+        for (ActivityCluster ac : inProgressClusters) {
+            ac.getStateMachine().notifyNodeFailures(deadNodes);
+        }
+        for (ActivityCluster ac : inProgressClusters) {
+            ActivityClusterStateMachine acsm = ac.getStateMachine();
+            if (acsm.canMakeProgress()) {
+                acsm.schedule();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
similarity index 83%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index c71b23b..44aa1b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -18,16 +18,15 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 
-public class DefaultJobScheduler implements IJobScheduler {
+public class JobScheduler {
     private final ClusterControllerService ccs;
 
-    public DefaultJobScheduler(ClusterControllerService ccs) {
+    public JobScheduler(ClusterControllerService ccs) {
         this.ccs = ccs;
     }
 
-    @Override
     public void notifyJobCreation(JobRun run) throws HyracksException {
-        IJobRunStateMachine jsm = new DefaultJobRunStateMachine(ccs, run);
+        JobRunStateMachine jsm = new JobRunStateMachine(ccs, run);
         run.setStateMachine(jsm);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
index fa11643..27e9480 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
@@ -16,7 +16,7 @@
 
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 
-public class RankedRunnableTaskCluster {
+public class RankedRunnableTaskCluster implements Comparable<RankedRunnableTaskCluster> {
     private final int rank;
 
     private final TaskCluster taskCluster;
@@ -38,4 +38,10 @@
     public String toString() {
         return "[" + rank + ":" + taskCluster + "]";
     }
+
+    @Override
+    public int compareTo(RankedRunnableTaskCluster o) {
+        int cmp = rank - o.rank;
+        return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
new file mode 100644
index 0000000..bbcb83d
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+
+public class TaskClusterBuilder {
+    private static final Logger LOGGER = Logger.getLogger(TaskClusterBuilder.class.getName());
+
+    private final JobRun jobRun;
+    private final PartitionConstraintSolver solver;
+
+    public TaskClusterBuilder(JobRun jobRun, PartitionConstraintSolver solver) {
+        this.jobRun = jobRun;
+        this.solver = solver;
+    }
+
+    public void buildTaskClusters(ActivityCluster ac) throws HyracksException {
+        Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+
+        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+        Set<ActivityId> activities = ac.getActivities();
+
+        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+
+        for (ActivityId anId : activities) {
+            ActivityPartitionDetails apd = pcMap.get(anId);
+            Task[] taskStates = new Task[apd.getPartitionCount()];
+            for (int i = 0; i < taskStates.length; ++i) {
+                TaskId tid = new TaskId(anId, i);
+                taskStates[i] = new Task(tid, apd);
+                Set<TaskId> cluster = new HashSet<TaskId>();
+                cluster.add(tid);
+                taskClusterMap.put(tid, cluster);
+            }
+            taskStateMap.put(anId, taskStates);
+        }
+
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
+        ac.setConnectorPolicyMap(connectorPolicies);
+
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId ac1 : activities) {
+            Task[] ac1TaskStates = taskStateMap.get(ac1);
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            if (outputConns != null) {
+                for (IConnectorDescriptor c : outputConns) {
+                    ConnectorDescriptorId cdId = c.getConnectorId();
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = taskStateMap.get(ac2);
+                    int nConsumers = ac2TaskStates.length;
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+                                .getTaskId());
+                        if (cInfoList == null) {
+                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                        }
+                        Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
+                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                            cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
+                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
+                            if (cPolicy.requiresProducerConsumerCoscheduling()) {
+                                cluster.add(ac2TaskStates[j].getTaskId());
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        boolean done = false;
+        while (!done) {
+            done = true;
+            Set<TaskId> set = new HashSet<TaskId>();
+            Set<TaskId> oldSet = null;
+            for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
+                set.clear();
+                oldSet = e.getValue();
+                set.addAll(e.getValue());
+                for (TaskId tid : e.getValue()) {
+                    set.addAll(taskClusterMap.get(tid));
+                }
+                for (TaskId tid : set) {
+                    Set<TaskId> targetSet = taskClusterMap.get(tid);
+                    if (!targetSet.equals(set)) {
+                        done = false;
+                        break;
+                    }
+                }
+                if (!done) {
+                    break;
+                }
+            }
+            for (TaskId tid : oldSet) {
+                taskClusterMap.put(tid, set);
+            }
+        }
+
+        Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
+        Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
+        for (Set<TaskId> cluster : clusters) {
+            Set<Task> taskStates = new HashSet<Task>();
+            for (TaskId tid : cluster) {
+                taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
+            }
+            TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
+            tcSet.add(tc);
+            for (TaskId tid : cluster) {
+                taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+            }
+        }
+        ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
+
+        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
+        for (TaskCluster tc : ac.getTaskClusters()) {
+            for (Task ts : tc.getTasks()) {
+                TaskId tid = ts.getTaskId();
+                List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
+                if (cInfoList != null) {
+                    for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
+                        Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+                        TaskCluster targetTC = targetTS.getTaskCluster();
+                        if (targetTC != tc) {
+                            ConnectorDescriptorId cdId = p.second;
+                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+                                    p.first.getPartition());
+                            tc.getProducedPartitions().add(pid);
+                            targetTC.getRequiredPartitions().add(pid);
+                            partitionProducingTaskClusterMap.put(pid, tc);
+                        }
+                    }
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Plan for " + ac);
+            LOGGER.info("Built " + tcSet.size() + " Task Clusters");
+            for (TaskCluster tc : tcSet) {
+                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+            }
+        }
+    }
+
+    private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
+            Map<ActivityId, ActivityPartitionDetails> pcMap) {
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+        Set<ActivityId> activities = ac.getActivities();
+        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId ac1 : activities) {
+            Task[] ac1TaskStates = taskStateMap.get(ac1);
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            if (outputConns != null) {
+                for (IConnectorDescriptor c : outputConns) {
+                    ConnectorDescriptorId cdId = c.getConnectorId();
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = taskStateMap.get(ac2);
+                    int nConsumers = ac2TaskStates.length;
+
+                    int[] fanouts = new int[nProducers];
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        fanouts[i] = targetBitmap.cardinality();
+                    }
+                    IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+                    cPolicyMap.put(cdId, cp);
+                }
+            }
+        }
+        return cPolicyMap;
+    }
+
+    private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+        IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+                .getConnectorPolicyAssignmentPolicy();
+        if (cpap != null) {
+            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+        }
+        return new PipelinedConnectorPolicy();
+    }
+
+    private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+            throws HyracksException {
+        Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
+        for (ActivityId anId : ac.getActivities()) {
+            lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
+        }
+        solver.solve(lValues);
+        Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
+        for (LValueConstraintExpression lv : lValues) {
+            Object value = solver.getValue(lv);
+            if (value == null) {
+                throw new HyracksException("No value found for " + lv);
+            }
+            if (!(value instanceof Number)) {
+                throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
+                        + value + ")");
+            }
+            int nParts = ((Number) value).intValue();
+            if (nParts <= 0) {
+                throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
+            }
+            nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
+        }
+        Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
+        for (ActivityId anId : ac.getActivities()) {
+            int nParts = nPartMap.get(anId.getOperatorDescriptorId());
+            int[] nInputPartitions = null;
+            List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+            if (inputs != null) {
+                nInputPartitions = new int[inputs.size()];
+                for (int i = 0; i < nInputPartitions.length; ++i) {
+                    nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+                            .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                }
+            }
+            int[] nOutputPartitions = null;
+            List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
+                    anId);
+            if (outputs != null) {
+                nOutputPartitions = new int[outputs.size()];
+                for (int i = 0; i < nOutputPartitions.length; ++i) {
+                    nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+                            .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                }
+            }
+            ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
+            activityPartsMap.put(anId, apd);
+        }
+        return activityPartsMap;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index ca2fad9..3979656 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -36,7 +36,7 @@
     public NodeCapability getNodeCapability() throws Exception;
 
     public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, byte[] ctxVarBytes) throws Exception;
 
     public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8ef0f3e..c932889 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -64,6 +65,8 @@
 
     private final Map<String, Counter> counterMap;
 
+    private final Map<MultipartName, Object> localVariableMap;
+
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
     private final IWorkspaceFileFactory fileFactory;
@@ -76,6 +79,7 @@
         envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
         taskMap = new HashMap<TaskAttemptId, Task>();
         counterMap = new HashMap<String, Counter>();
+        localVariableMap = new HashMap<MultipartName, Object>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
     }
@@ -85,7 +89,7 @@
         return jobId;
     }
 
-    public IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
+    public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
         if (!envMap.containsKey(opId)) {
             envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
         }
@@ -104,6 +108,17 @@
         return taskMap;
     }
 
+    public synchronized Object lookupLocalVariable(MultipartName name) throws HyracksDataException {
+        if (!localVariableMap.containsKey(name)) {
+            throw new HyracksDataException("Unknown variable: " + name);
+        }
+        return localVariableMap.get(name);
+    }
+
+    public synchronized void setLocalVariable(MultipartName name, Object value) {
+        localVariableMap.put(name, value);
+    }
+
     private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
         private final String nodeId;
         private final Map<String, Object> map;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3223907..e97eede 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,6 +24,7 @@
 import java.rmi.registry.Registry;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +65,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
@@ -230,10 +233,11 @@
     @Override
     public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
         try {
             NCApplicationContext appCtx = applications.get(appName);
             final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
+            Map<MultipartName, Object> ctxVarMap = (Map<MultipartName, Object>) appCtx.deserialize(ctxVarBytes);
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
@@ -257,10 +261,10 @@
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 final int partition = tid.getPartition();
+                Map<MultipartName, Object> inputGlobalVariables = createInputGlobalVariables(ctxVarMap, han);
                 Task task = new Task(joblet, taId, han.getClass().getName(), executor);
-                IOperatorNodePushable operator = han.createPushRuntime(task,
-                        joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
-                        partition, td.getPartitionCount());
+                IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition());
+                IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition, td.getPartitionCount());
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
 
@@ -308,6 +312,14 @@
         }
     }
 
+    private Map<MultipartName, Object> createInputGlobalVariables(Map<MultipartName, Object> ctxVarMap, IActivity han) {
+        Map<MultipartName, Object> gVars = new HashMap<MultipartName, Object>();
+//        for (MultipartName inVar : han.getInputVariables()) {
+//            gVars.put(inVar, ctxVarMap.get(inVar));
+//        }
+        return gVars;
+    }
+
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
             throws HyracksDataException {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index e787a97..e65f59e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.nc;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -25,6 +26,7 @@
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,7 +36,9 @@
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.workflow.variables.WorkflowVariableDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -57,13 +61,20 @@
 
     private final Map<String, Counter> counterMap;
 
+    private final Map<MultipartName, Object> inputGlobalVariables;
+
+    private final Map<MultipartName, Object> outputVariables;
+
+    private final Map<MultipartName, WorkflowVariableDescriptor> outputVariableDescriptorMap;
+
     private IPartitionCollector[] collectors;
 
     private IOperatorNodePushable operator;
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName,
+            Executor executor) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
@@ -71,6 +82,13 @@
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
+//        this.inputGlobalVariables = inputGlobalVariables;
+        inputGlobalVariables = Collections.emptyMap();
+        outputVariables = new HashMap<MultipartName, Object>();
+        outputVariableDescriptorMap = new HashMap<MultipartName, WorkflowVariableDescriptor>();
+//        for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
+//            outputVariableDescriptorMap.put(wvd.getName(), wvd);
+//        }
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -108,6 +126,28 @@
         deallocatableRegistry.registerDeallocatable(deallocatable);
     }
 
+    @Override
+    public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException {
+        MultipartName var = new MultipartName(producerActivity, partition, varName);
+        if (!inputGlobalVariables.containsKey(var)) {
+            throw new HyracksDataException("Unknown Variable: " + var);
+        }
+        return inputGlobalVariables.get(var);
+    }
+
+    @Override
+    public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException {
+        return joblet.lookupLocalVariable(new MultipartName(producerActivity, partition, varName));
+    }
+
+    @Override
+    public void setVariable(String name, Object value) {
+        outputVariables.put(new MultipartName(taskAttemptId.getTaskId().getActivityId(), taskAttemptId.getTaskId()
+                .getPartition(), name), value);
+    }
+
     public void close() {
         deallocatableRegistry.close();
     }
@@ -190,6 +230,23 @@
             } finally {
                 operator.deinitialize();
             }
+            Map<MultipartName, Object> outputGlobalVariables = new HashMap<MultipartName, Object>();
+            for (Map.Entry<MultipartName, Object> e : outputVariables.entrySet()) {
+                MultipartName varName = e.getKey();
+                WorkflowVariableDescriptor wvd = outputVariableDescriptorMap.get(varName);
+                if (wvd == null) {
+                    throw new HyracksDataException("Unknown variable found: " + varName);
+                }
+                switch (wvd.getScope()) {
+                    case LOCAL:
+                        joblet.setLocalVariable(varName, e.getValue());
+                        break;
+
+                    case GLOBAL:
+                        outputGlobalVariables.put(varName, e.getValue());
+                        break;
+                }
+            }
             joblet.notifyTaskComplete(this);
         } catch (Exception e) {
             e.printStackTrace();
@@ -207,7 +264,8 @@
         try {
             collector.open();
             try {
-                joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
+                joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+                        PartitionState.STARTED);
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index ca37f37..6df880a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -381,7 +381,7 @@
                             (Class<? extends Writable>) oldReader.createKey().getClass(),
                             (Class<? extends Writable>) oldReader.createValue().getClass());
                 }
-                return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+                return createSelfReadingMapper(ctx, recordDescriptor, partition);
             } else {
                 return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
                         recordDescProvider.getInputRecordDescriptor(this.odId, 0));
@@ -391,7 +391,7 @@
         }
     }
 
-    private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx, IOperatorEnvironment env,
+    private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
             final RecordDescriptor recordDescriptor, final int partition) {
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index efb616b..79d9c8a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -55,7 +55,7 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward, lowKeyFields,
                 highKeyFields, lowKeyInclusive, highKeyInclusive);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index cbab080..83b608f 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -88,4 +89,21 @@
     public TaskAttemptId getTaskAttemptId() {
         return taskId;
     }
+
+    @Override
+    public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+            throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    public void setVariable(String name, Object value) throws HyracksDataException {
+
+    }
 }
\ No newline at end of file