Added sort-merge collector. tests pass

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@389 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index a7646fc..8584d36 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -22,7 +22,7 @@
 public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry {
     public IHyracksJobletContext getJobletContext();
 
-    public TaskAttemptId getTaskId();
+    public TaskAttemptId getTaskAttemptId();
 
     public ICounterContext getCounterContext();
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index aebd6ce..4210a23 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -206,8 +206,8 @@
     @Override
     public void notifyTaskFailure(UUID jobId, TaskAttemptId taskId, String nodeId, Exception exception)
             throws Exception {
-        TaskFailureEvent sfe = new TaskFailureEvent(this, jobId, taskId, nodeId, exception);
-        jobQueue.schedule(sfe);
+        TaskFailureEvent tfe = new TaskFailureEvent(this, jobId, taskId, nodeId, exception);
+        jobQueue.schedule(tfe);
     }
 
     @Override
@@ -240,9 +240,6 @@
 
     @Override
     public synchronized void nodeHeartbeat(String id) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Heartbeat from: " + id);
-        }
         jobQueue.schedule(new NodeHeartbeatEvent(this, id));
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 1e9261b..0c515f4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -20,6 +20,7 @@
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
 
 public class ActivityCluster {
@@ -31,7 +32,7 @@
 
     private final Set<ActivityCluster> dependents;
 
-    private final Map<ActivityNodeId, TaskState[]> taskStateMap;
+    private final Map<ActivityNodeId, Task[]> taskStateMap;
 
     private TaskCluster[] taskClusters;
 
@@ -42,7 +43,7 @@
         this.activities = activities;
         dependencies = new HashSet<ActivityCluster>();
         dependents = new HashSet<ActivityCluster>();
-        taskStateMap = new HashMap<ActivityNodeId, TaskState[]>();
+        taskStateMap = new HashMap<ActivityNodeId, Task[]>();
     }
 
     public Set<ActivityNodeId> getActivities() {
@@ -61,7 +62,7 @@
         return dependencies;
     }
 
-    public Map<ActivityNodeId, TaskState[]> getTaskStateMap() {
+    public Map<ActivityNodeId, Task[]> getTaskStateMap() {
         return taskStateMap;
     }
 
@@ -84,4 +85,16 @@
     public JobRun getJobRun() {
         return jobRun;
     }
+
+    public int getMaxTaskClusterAttempts() {
+        return 1;
+    }
+
+    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
+        acsm.notifyTaskClusterFailure(tcAttempt, exception);
+    }
+
+    public void notifyActivityClusterComplete() throws HyracksException {
+        jobRun.getStateMachine().notifyActivityClusterComplete(this);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
similarity index 72%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 3bbba36..3ca75b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -15,20 +15,28 @@
 package edu.uci.ics.hyracks.control.cc.job;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 
-public class TaskState {
+public class Task {
     private final TaskId taskId;
 
+    private final ActivityPartitionDetails apd;
+
     private TaskCluster taskCluster;
 
-    public TaskState(TaskId taskId) {
+    public Task(TaskId taskId, ActivityPartitionDetails apd) {
         this.taskId = taskId;
+        this.apd = apd;
     }
 
     public TaskId getTaskId() {
         return taskId;
     }
 
+    public ActivityPartitionDetails getActivityPartitionDetails() {
+        return apd;
+    }
+
     public TaskCluster getTaskCluster() {
         return taskCluster;
     }
@@ -36,4 +44,9 @@
     public void setTaskCluster(TaskCluster taskCluster) {
         this.taskCluster = taskCluster;
     }
+
+    @Override
+    public String toString() {
+        return taskId + "(" + apd + ")";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index 6d61ccb..f17ac12 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -26,9 +26,11 @@
         ABORTED,
     }
 
+    private final TaskClusterAttempt tcAttempt;
+
     private final TaskAttemptId taskId;
 
-    private final TaskState taskState;
+    private final Task taskState;
 
     private String nodeId;
 
@@ -36,16 +38,21 @@
 
     private Exception exception;
 
-    public TaskAttempt(TaskAttemptId taskId, TaskState taskState) {
+    public TaskAttempt(TaskClusterAttempt tcAttempt, TaskAttemptId taskId, Task taskState) {
+        this.tcAttempt = tcAttempt;
         this.taskId = taskId;
         this.taskState = taskState;
     }
 
+    public TaskClusterAttempt getTaskClusterAttempt() {
+        return tcAttempt;
+    }
+
     public TaskAttemptId getTaskAttemptId() {
         return taskId;
     }
 
-    public TaskState getTaskState() {
+    public Task getTaskState() {
         return taskState;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index 78ac34b..068ec5f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -24,7 +24,7 @@
 public class TaskCluster {
     private final ActivityCluster activityCluster;
 
-    private final TaskState[] tasks;
+    private final Task[] tasks;
 
     private final Set<TaskCluster> blockers;
 
@@ -32,7 +32,7 @@
 
     private final List<TaskClusterAttempt> taskClusterAttempts;
 
-    public TaskCluster(ActivityCluster activityCluster, TaskState[] tasks) {
+    public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
         this.activityCluster = activityCluster;
         this.tasks = tasks;
         this.blockers = new HashSet<TaskCluster>();
@@ -40,7 +40,7 @@
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
     }
 
-    public TaskState[] getTasks() {
+    public Task[] getTasks() {
         return tasks;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
index f0d951e..9d33d12 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
@@ -14,22 +14,34 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
-
 public class TaskClusterAttempt {
     public enum TaskClusterStatus {
-        INITIALIZED,
         RUNNING,
         COMPLETED,
         FAILED,
+        ABORTED,
     }
 
-    private final TaskAttempt[] taskAttempts;
+    private final TaskCluster taskCluster;
+
+    private final int attempt;
+
+    private TaskAttempt[] taskAttempts;
 
     private TaskClusterStatus status;
 
     private int pendingTaskCounter;
 
-    public TaskClusterAttempt(TaskAttempt[] taskAttempts) {
+    public TaskClusterAttempt(TaskCluster taskCluster, int attempt) {
+        this.taskCluster = taskCluster;
+        this.attempt = attempt;
+    }
+
+    public TaskCluster getTaskCluster() {
+        return taskCluster;
+    }
+
+    public void setTaskAttempts(TaskAttempt[] taskAttempts) {
         this.taskAttempts = taskAttempts;
     }
 
@@ -37,6 +49,10 @@
         return taskAttempts;
     }
 
+    public int getAttempt() {
+        return attempt;
+    }
+
     public void setStatus(TaskClusterStatus status) {
         this.status = status;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
new file mode 100644
index 0000000..a69b144
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
+import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
+
+public abstract class AbstractTaskLifecycleEvent extends AbstractEvent {
+    protected final ClusterControllerService ccs;
+    protected final UUID jobId;
+    protected final TaskAttemptId taId;
+    protected final String nodeId;
+
+    public AbstractTaskLifecycleEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.taId = taId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public final void run() {
+        JobRun run = ccs.getRunMap().get(jobId);
+        if (run != null) {
+            TaskId tid = taId.getTaskId();
+            Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+            ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
+            if (ac != null) {
+                Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+                Task[] taskStates = taskStateMap.get(tid.getActivityId());
+                if (taskStates != null && taskStates.length > tid.getPartition()) {
+                    Task ts = taskStates[tid.getPartition()];
+                    TaskCluster tc = ts.getTaskCluster();
+                    List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
+                    if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
+                        TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
+                        for (TaskAttempt ta : tca.getTaskAttempts()) {
+                            if (ta.getTaskAttemptId().equals(taId)) {
+                                performEvent(ta);
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    protected abstract void performEvent(TaskAttempt ta);
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
index 69c244a..050c6ed 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
@@ -19,13 +19,14 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 
-public class ApplicationDestroyEvent implements Runnable {
+public class ApplicationDestroyEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final String appName;
     private FutureValue fv;
@@ -57,7 +58,7 @@
                     fv.setException(e);
                     return;
                 }
-                ccs.getJobQueue().schedule(new Runnable() {
+                ccs.getJobQueue().schedule(new AbstractEvent() {
                     @Override
                     public void run() {
                         try {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
index 9f4ad8f..ab05d0f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
@@ -20,13 +20,14 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 
-public class ApplicationStartEvent implements Runnable {
+public class ApplicationStartEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final String appName;
     private final FutureValue fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index f53fb5f..e4353f8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -19,9 +19,9 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetJobProfileJSONEvent extends SynchronizableRunnable {
+public class GetJobProfileJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
     private final int attempt;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
index 509dcd2..195b8c1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
@@ -20,9 +20,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetJobSpecificationJSONEvent extends SynchronizableRunnable {
+public class GetJobSpecificationJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
     private JSONObject spec;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
index c80974a..7768587 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
@@ -18,9 +18,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetJobStatusConditionVariableEvent extends SynchronizableRunnable {
+public class GetJobStatusConditionVariableEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
     private IJobStatusConditionVariable cVar;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
index 9b15e5c..589ac34 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
@@ -19,9 +19,9 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetJobStatusEvent extends SynchronizableRunnable {
+public class GetJobStatusEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
     private JobStatus status;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
index 3c19f59..bf5e5c3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
@@ -19,9 +19,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetJobSummariesJSONEvent extends SynchronizableRunnable {
+public class GetJobSummariesJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private JSONArray summaries;
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
index 7554e62..0db7f2a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
@@ -16,9 +16,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class GetNodeEvent extends SynchronizableRunnable {
+public class GetNodeEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final String nodeId;
     private NodeControllerState state;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index 87ba685..f59e9cc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -22,10 +22,11 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
 
-public class JobCleanupEvent implements Runnable {
+public class JobCleanupEvent extends AbstractEvent {
     private ClusterControllerService ccs;
     private UUID jobId;
     private JobStatus status;
@@ -57,7 +58,7 @@
                         e.printStackTrace();
                     }
                 }
-                ccs.getJobQueue().schedule(new Runnable() {
+                ccs.getJobQueue().schedule(new AbstractEvent() {
                     @Override
                     public void run() {
                         CCApplicationContext appCtx = ccs.getApplicationMap().get(
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index d6b09db..9e47d8a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -29,9 +29,9 @@
 import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class JobCreateEvent extends SynchronizableRunnable {
+public class JobCreateEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final byte[] jobSpec;
     private final EnumSet<JobFlag> jobFlags;
@@ -65,7 +65,7 @@
         });
         final JobActivityGraph jag = builder.getActivityGraph();
 
-        JobRun run = new JobRun(UUID.randomUUID(), jag);
+        JobRun run = new JobRun(jobId, jag);
 
         run.setStatus(JobStatus.INITIALIZED, null);
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
index 7b9a3c6..5fd70b4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
@@ -19,9 +19,9 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class JobStartEvent extends SynchronizableRunnable {
+public class JobStartEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
index 348d703..8643e6a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
@@ -15,12 +15,13 @@
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
 import java.util.Map;
+import java.util.logging.Level;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class NodeHeartbeatEvent extends SynchronizableRunnable {
+public class NodeHeartbeatEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final String nodeId;
 
@@ -37,4 +38,9 @@
             state.notifyHeartbeat();
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.FINEST;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..6316b3e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -18,9 +18,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class RegisterNodeEvent extends SynchronizableRunnable {
+public class RegisterNodeEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final String nodeId;
     private final NodeControllerState state;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 0269122..e9861d8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -21,9 +21,10 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 
-public class RegisterPartitionAvailibilityEvent implements Runnable {
+public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final PartitionId pid;
     private final NetworkAddress networkAddress;
@@ -58,4 +59,9 @@
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return "PartitionAvailable@" + networkAddress + "[" + pid + "]";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index 4d34e6e..e7c92e7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -22,9 +22,10 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 
-public class RegisterPartitionRequestEvent implements Runnable {
+public class RegisterPartitionRequestEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final Collection<PartitionId> requiredPartitionIds;
     private final String nodeId;
@@ -61,4 +62,9 @@
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return "PartitionRequest@[" + nodeId + "][" + requiredPartitionIds + "]";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 8a1703b..b64a784 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -17,12 +17,14 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 
-public class RemoveDeadNodesEvent implements Runnable {
+public class RemoveDeadNodesEvent extends AbstractEvent {
     private static Logger LOGGER = Logger.getLogger(RemoveDeadNodesEvent.class.getName());
 
     private final ClusterControllerService ccs;
@@ -47,4 +49,9 @@
             // Deal with dead tasks.
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.FINE;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
index 24f884a..a103bdd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
@@ -17,12 +17,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.logging.Level;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
-public class ReportProfilesEvent implements Runnable {
+public class ReportProfilesEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final List<JobProfile> profiles;
 
@@ -38,4 +40,9 @@
             JobRun run = runMap.get(profile.getJobId());
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.FINEST;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index f9fa355..29a341f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -14,65 +14,29 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
 
-public class TaskCompleteEvent implements Runnable {
-    private final ClusterControllerService ccs;
-    private final UUID jobId;
-    private final TaskAttemptId taId;
-    private final String nodeId;
-
+public class TaskCompleteEvent extends AbstractTaskLifecycleEvent {
     public TaskCompleteEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.taId = taId;
-        this.nodeId = nodeId;
+        super(ccs, jobId, taId, nodeId);
     }
 
     @Override
-    public void run() {
-        JobRun run = ccs.getRunMap().get(jobId);
-        if (run != null) {
-            TaskId tid = taId.getTaskId();
-            Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
-            ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
-            if (ac != null) {
-                Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
-                TaskState[] taskStates = taskStateMap.get(tid.getActivityId());
-                if (taskStates != null && taskStates.length > tid.getPartition()) {
-                    TaskState ts = taskStates[tid.getPartition()];
-                    TaskCluster tc = ts.getTaskCluster();
-                    List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
-                    if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
-                        TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
-                        TaskAttempt ta = tca.getTaskAttempts()[tid.getPartition()];
-                        try {
-                            ta.notifyTaskComplete();
-                        } catch (HyracksException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
+    protected void performEvent(TaskAttempt ta) {
+        try {
+            ta.notifyTaskComplete();
+        } catch (HyracksException e) {
+            e.printStackTrace();
         }
     }
 
     @Override
     public String toString() {
-        return "TaskCompleteEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+        return "TaskCompleteEvent@[" + nodeId + "[" + jobId + ":" + taId + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 710c2b1..bd6e71e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -14,67 +14,33 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
 
-public class TaskFailureEvent implements Runnable {
-    private final ClusterControllerService ccs;
-    private final UUID jobId;
-    private final TaskAttemptId taId;
-    private final String nodeId;
+public class TaskFailureEvent extends AbstractTaskLifecycleEvent {
     private final Exception exception;
 
-    public TaskFailureEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId, Exception exception) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.taId = taId;
-        this.nodeId = nodeId;
+    public TaskFailureEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId,
+            Exception exception) {
+        super(ccs, jobId, taId, nodeId);
         this.exception = exception;
     }
 
     @Override
-    public void run() {
-        JobRun run = ccs.getRunMap().get(jobId);
-        if (run != null) {
-            TaskId tid = taId.getTaskId();
-            Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
-            ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
-            if (ac != null) {
-                Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
-                TaskState[] taskStates = taskStateMap.get(tid.getActivityId());
-                if (taskStates != null && taskStates.length > tid.getPartition()) {
-                    TaskState ts = taskStates[tid.getPartition()];
-                    TaskCluster tc = ts.getTaskCluster();
-                    List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
-                    if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
-                        TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
-                        TaskAttempt ta = tca.getTaskAttempts()[tid.getPartition()];
-                        try {
-                            ta.notifyTaskFailure(exception);
-                        } catch (HyracksException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
+    protected void performEvent(TaskAttempt ta) {
+        try {
+            ta.notifyTaskFailure(exception);
+        } catch (HyracksException e) {
+            e.printStackTrace();
         }
     }
 
     @Override
     public String toString() {
-        return "TaskCompleteEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+        return "TaskFailureEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
index fc1ba03..b89de07 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
@@ -18,9 +18,9 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
-public class UnregisterNodeEvent extends SynchronizableRunnable {
+public class UnregisterNodeEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final String nodeId;
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java
new file mode 100644
index 0000000..5fcd56a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+import java.util.logging.Level;
+
+public abstract class AbstractEvent implements Runnable {
+    public Level logLevel() {
+        return Level.INFO;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
index f533ad0..84a844c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
@@ -20,21 +20,23 @@
 public class JobQueue {
     private static final Logger LOGGER = Logger.getLogger(JobQueue.class.getName());
 
-    private final LinkedBlockingQueue<Runnable> queue;
+    private final LinkedBlockingQueue<AbstractEvent> queue;
     private final JobThread thread;
 
     public JobQueue() {
-        queue = new LinkedBlockingQueue<Runnable>();
+        queue = new LinkedBlockingQueue<AbstractEvent>();
         thread = new JobThread();
         thread.start();
     }
 
-    public void schedule(Runnable runnable) {
-        LOGGER.info("Scheduling: " + runnable);
-        queue.offer(runnable);
+    public void schedule(AbstractEvent event) {
+        if (LOGGER.isLoggable(event.logLevel())) {
+            LOGGER.info("Scheduling: " + event);
+        }
+        queue.offer(event);
     }
 
-    public void scheduleAndSync(SynchronizableRunnable sRunnable) throws Exception {
+    public void scheduleAndSync(SynchronizableEvent sRunnable) throws Exception {
         schedule(sRunnable);
         sRunnable.sync();
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java
new file mode 100644
index 0000000..88c9097
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+public abstract class SynchronizableEvent extends AbstractEvent {
+    private boolean done;
+
+    private Exception e;
+
+    protected abstract void doRun() throws Exception;
+
+    public void init() {
+        done = false;
+        e = null;
+    }
+
+    @Override
+    public final void run() {
+        try {
+            doRun();
+        } catch (Exception e) {
+            this.e = e;
+        } finally {
+            synchronized (this) {
+                done = true;
+                notifyAll();
+            }
+        }
+    }
+
+    public final synchronized void sync() throws Exception {
+        while (!done) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java
deleted file mode 100644
index 53315b7..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.control.cc.jobqueue;
-
-public abstract class SynchronizableRunnable implements Runnable {
-    private boolean done;
-
-    private Exception e;
-
-    protected abstract void doRun() throws Exception;
-
-    public void init() {
-        done = false;
-        e = null;
-    }
-
-    @Override
-    public final void run() {
-        try {
-            doRun();
-        } catch (Exception e) {
-            this.e = e;
-        } finally {
-            synchronized (this) {
-                done = true;
-                notifyAll();
-            }
-        }
-    }
-
-    public final synchronized void sync() throws Exception {
-        while (!done) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        if (e != null) {
-            throw e;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
new file mode 100644
index 0000000..d3fc871
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.Arrays;
+
+public class ActivityPartitionDetails {
+    private final int nPartitions;
+
+    private final int[] nInputPartitions;
+
+    private final int[] nOutputPartitions;
+
+    public ActivityPartitionDetails(int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+        this.nPartitions = nPartitions;
+        this.nInputPartitions = nInputPartitions;
+        this.nOutputPartitions = nOutputPartitions;
+    }
+
+    public int getPartitionCount() {
+        return nPartitions;
+    }
+
+    public int[] getInputPartitionCounts() {
+        return nInputPartitions;
+    }
+
+    public int[] getOutputPartitionCounts() {
+        return nOutputPartitions;
+    }
+
+    @Override
+    public String toString() {
+        return nPartitions + ":" + Arrays.toString(nInputPartitions) + ":" + Arrays.toString(nOutputPartitions);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index 1e68f76..cdd6a65 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -16,17 +16,21 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -36,10 +40,11 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -50,11 +55,14 @@
 
     private final ActivityCluster ac;
 
+    private final Set<TaskCluster> inProgressTaskClusters;
+
     public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
             ActivityCluster ac) {
         this.ccs = ccs;
         this.jsm = jsm;
         this.ac = ac;
+        inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
     @Override
@@ -62,67 +70,78 @@
         startRunnableTaskClusters();
     }
 
-    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptId>> taskAttemptMap)
+    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
             throws HyracksException {
-        TaskState[] tasks = tc.getTasks();
+        Task[] tasks = tc.getTasks();
         List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
         int attempts = tcAttempts.size();
+        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
         TaskAttempt[] taskAttempts = new TaskAttempt[tasks.length];
         Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>();
         for (int i = 0; i < tasks.length; ++i) {
-            TaskState ts = tasks[i];
+            Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(new TaskAttemptId(new TaskId(tid.getActivityId(),
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
                     tid.getPartition()), attempts), ts);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
             locationMap.put(tid,
                     new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
             taskAttempts[i] = taskAttempt;
         }
+        tcAttempt.setTaskAttempts(taskAttempts);
         PartitionConstraintSolver solver = jsm.getSolver();
         solver.solve(locationMap.values());
+        Map<OperatorDescriptorId, String> operatorLocationAssignmentMap = jsm.getOperatorLocationAssignmentMap();
         for (int i = 0; i < tasks.length; ++i) {
-            TaskState ts = tasks[i];
+            Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
             TaskAttempt taskAttempt = taskAttempts[i];
-            LValueConstraintExpression pLocationExpr = locationMap.get(tid);
-            Object location = solver.getValue(pLocationExpr);
-            String nodeId = null;
-            Set<String> liveNodes = ccs.getNodeMap().keySet();
-            if (location == null) {
-                // pick any
-                nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
-                        % liveNodes.size()];
-            } else if (location instanceof String) {
-                nodeId = (String) location;
-                if (!liveNodes.contains(nodeId)) {
-                    throw new HyracksException("Node " + nodeId + " not live");
-                }
-            } else if (location instanceof String[]) {
-                for (String choice : (String[]) location) {
-                    if (liveNodes.contains(choice)) {
-                        nodeId = choice;
-                        break;
+            String nodeId = operatorLocationAssignmentMap.get(tid.getActivityId().getOperatorDescriptorId());
+            if (nodeId == null) {
+                LValueConstraintExpression pLocationExpr = locationMap.get(tid);
+                Object location = solver.getValue(pLocationExpr);
+                Set<String> liveNodes = ccs.getNodeMap().keySet();
+                if (location == null) {
+                    // pick any
+                    nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
+                            % liveNodes.size()];
+                } else if (location instanceof String) {
+                    nodeId = (String) location;
+                    if (!liveNodes.contains(nodeId)) {
+                        throw new HyracksException("Node " + nodeId + " not live");
                     }
+                } else if (location instanceof String[]) {
+                    for (String choice : (String[]) location) {
+                        if (liveNodes.contains(choice)) {
+                            nodeId = choice;
+                            break;
+                        }
+                    }
+                    if (nodeId == null) {
+                        throw new HyracksException("No satisfiable location found for "
+                                + taskAttempt.getTaskAttemptId());
+                    }
+                } else {
+                    throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
+                            + location.getClass() + ")");
                 }
-                if (nodeId == null) {
-                    throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
-                }
-            } else {
-                throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
-                        + location.getClass() + ")");
+                operatorLocationAssignmentMap.put(tid.getActivityId().getOperatorDescriptorId(), nodeId);
             }
             taskAttempt.setNodeId(nodeId);
-            List<TaskAttemptId> taIds = taskAttemptMap.get(nodeId);
-            if (taIds == null) {
-                taIds = new ArrayList<TaskAttemptId>();
-                taskAttemptMap.put(nodeId, taIds);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
+            List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
+            if (tads == null) {
+                tads = new ArrayList<TaskAttemptDescriptor>();
+                taskAttemptMap.put(nodeId, tads);
             }
-            taIds.add(taskAttempt.getTaskAttemptId());
+            ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
+            tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
+                    .getInputPartitionCounts(), apd.getOutputPartitionCounts()));
         }
-        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(taskAttempts);
         tcAttempt.initializePendingTaskCounter();
         tcAttempts.add(tcAttempt);
-
+        tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
+        inProgressTaskClusters.add(tc);
     }
 
     @Override
@@ -138,6 +157,7 @@
                 ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
                 if (tcAttempt.decrementPendingTasksCounter() == 0) {
                     tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                    inProgressTaskClusters.remove(tc);
                     startRunnableTaskClusters();
                 }
             } else {
@@ -151,9 +171,17 @@
     private void startRunnableTaskClusters() throws HyracksException {
         TaskCluster[] taskClusters = ac.getTaskClusters();
 
-        Map<String, List<TaskAttemptId>> taskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
+        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
         for (TaskCluster tc : taskClusters) {
             Set<TaskCluster> dependencies = tc.getDependencies();
+            List<TaskClusterAttempt> attempts = tc.getAttempts();
+            if (!attempts.isEmpty()) {
+                TaskClusterAttempt lastAttempt = attempts.get(attempts.size() - 1);
+                if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+                        || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                    continue;
+                }
+            }
             boolean runnable = true;
             for (TaskCluster depTC : dependencies) {
                 List<TaskClusterAttempt> tcAttempts = depTC.getAttempts();
@@ -168,30 +196,45 @@
                 }
             }
             if (runnable) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+                    LOGGER.info("Attempts so far:" + attempts.size());
+                    for (TaskClusterAttempt tcAttempt : attempts) {
+                        LOGGER.info("Status: " + tcAttempt.getStatus());
+                    }
+                }
                 assignTaskLocations(tc, taskAttemptMap);
             }
         }
 
+        if (taskAttemptMap.isEmpty()) {
+            if (inProgressTaskClusters.isEmpty()) {
+                ac.notifyActivityClusterComplete();
+            }
+            return;
+        }
+
         startTasks(taskAttemptMap);
     }
 
-    private void startTasks(Map<String, List<TaskAttemptId>> taskAttemptMap) {
+    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) {
         Executor executor = ccs.getExecutor();
         JobRun jobRun = ac.getJobRun();
         final UUID jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
-        for (Map.Entry<String, List<TaskAttemptId>> e : taskAttemptMap.entrySet()) {
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
             String nodeId = e.getKey();
-            final List<TaskAttemptId> taskIds = e.getValue();
+            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
+                jobRun.getParticipatingNodeIds().add(nodeId);
                 executor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
                             node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskIds, null);
+                                    taskDescriptors);
                         } catch (IOException e) {
                             e.printStackTrace();
                         } catch (Exception e) {
@@ -203,6 +246,41 @@
         }
     }
 
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+        Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
+        for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
+            if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
+                ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
+                if (abortTaskAttempts == null) {
+                    abortTaskAttempts = new ArrayList<TaskAttemptId>();
+                    abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
+                }
+                abortTaskAttempts.add(ta2.getTaskAttemptId());
+            }
+        }
+        JobRun jobRun = ac.getJobRun();
+        final UUID jobId = jobRun.getJobId();
+        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+            if (node != null) {
+                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        }
+
+    }
+
     @Override
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
         TaskAttemptId taId = ta.getTaskAttemptId();
@@ -212,43 +290,11 @@
         if (taId.getAttempt() == lastAttempt) {
             TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
             TaskAttempt.TaskStatus taStatus = ta.getStatus();
-            Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
             if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
-                for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
-                    if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING
-                            || ta2.getStatus() == TaskAttempt.TaskStatus.INITIALIZED) {
-                        ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
-                        List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
-                        if (abortTaskAttempts == null) {
-                            abortTaskAttempts = new ArrayList<TaskAttemptId>();
-                            abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
-                        }
-                        abortTaskAttempts.add(ta2.getTaskAttemptId());
-                    }
-                }
-                JobRun jobRun = ac.getJobRun();
-                final UUID jobId = jobRun.getJobId();
-                for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
-                    final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
-                    final List<TaskAttemptId> abortTaskAttempts = e.getValue();
-                    if (node != null) {
-                        ccs.getExecutor().execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                                } catch (Exception e) {
-                                    e.printStackTrace();
-                                }
-                            }
-                        });
-                    }
-                }
+                abortTaskCluster(tcAttempt);
                 tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                Map<String, List<TaskAttemptId>> taskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
-                assignTaskLocations(tc, taskAttemptMap);
-                startTasks(taskAttemptMap);
+                ac.notifyTaskClusterFailure(tcAttempt, exception);
             } else {
                 LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
             }
@@ -256,4 +302,32 @@
             LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
         }
     }
+
+    @Override
+    public void abort() {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+        for (TaskCluster tc : taskClusters) {
+            List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+            if (!tcAttempts.isEmpty()) {
+                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
+                if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                    abortTaskCluster(tcAttempt);
+                    tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
+        TaskCluster tc = tcAttempt.getTaskCluster();
+        if (tcAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
+            abort();
+            ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, exception);
+            return;
+        }
+        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
+        assignTaskLocations(tc, taskAttemptMap);
+        startTasks(taskAttemptMap);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 9e70b42..b20c467 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,7 +35,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -47,11 +47,11 @@
 import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
 import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
 
 public class DefaultJobRunStateMachine implements IJobRunStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -60,6 +60,8 @@
 
     private final JobRun jobRun;
 
+    private final Map<OperatorDescriptorId, String> operatorLocationAssignmentMap;
+
     private final Set<ActivityCluster> completedClusters;
 
     private final Set<ActivityCluster> inProgressClusters;
@@ -71,10 +73,15 @@
     public DefaultJobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
         this.ccs = ccs;
         this.jobRun = jobRun;
+        this.operatorLocationAssignmentMap = new HashMap<OperatorDescriptorId, String>();
         completedClusters = new HashSet<ActivityCluster>();
         inProgressClusters = new HashSet<ActivityCluster>();
     }
 
+    public Map<OperatorDescriptorId, String> getOperatorLocationAssignmentMap() {
+        return operatorLocationAssignmentMap;
+    }
+
     public PartitionConstraintSolver getSolver() {
         return solver;
     }
@@ -161,6 +168,7 @@
                 s.addDependent(bs);
             }
         }
+        jobRun.getActivityClusterMap().putAll(stageMap);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
             for (ActivityCluster s : stages) {
@@ -193,7 +201,8 @@
     }
 
     private void findRunnableClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
-        if (completedClusters.contains(candidate) || frontier.contains(candidate)) {
+        if (completedClusters.contains(candidate) || frontier.contains(candidate)
+                || inProgressClusters.contains(candidate)) {
             return;
         }
         boolean runnable = true;
@@ -241,26 +250,32 @@
             solver.addConstraints(contributedConstraints);
 
             rootActivityCluster = inferStages(jag);
-            Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
-            findRunnableClusters(runnableClusters);
-            if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
-                ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
-                return;
-            }
-            for (ActivityCluster ac : runnableClusters) {
-                inProgressClusters.add(ac);
-                buildTaskClusters(ac);
-                IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
-                ac.setStateMachine(acsm);
-                acsm.schedule();
-            }
+            startRunnableActivityClusters();
         } catch (Exception e) {
+            e.printStackTrace();
             ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
             throw new HyracksException(e);
         }
     }
 
-    private Map<ActivityNodeId, Integer> computePartitionCounts(ActivityCluster ac) throws HyracksException {
+    private void startRunnableActivityClusters() throws HyracksException {
+        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
+        findRunnableClusters(runnableClusters);
+        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+            return;
+        }
+        for (ActivityCluster ac : runnableClusters) {
+            inProgressClusters.add(ac);
+            buildTaskClusters(ac);
+            IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
+            ac.setStateMachine(acsm);
+            acsm.schedule();
+        }
+    }
+
+    private Map<ActivityNodeId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+            throws HyracksException {
         Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
         for (ActivityNodeId anId : ac.getActivities()) {
             lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
@@ -282,22 +297,43 @@
             }
             nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
         }
-        Map<ActivityNodeId, Integer> activityPartsMap = new HashMap<ActivityNodeId, Integer>();
+        Map<ActivityNodeId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityNodeId, ActivityPartitionDetails>();
         for (ActivityNodeId anId : ac.getActivities()) {
-            activityPartsMap.put(anId, nPartMap.get(anId.getOperatorDescriptorId()));
+            int nParts = nPartMap.get(anId.getOperatorDescriptorId());
+            int[] nInputPartitions = null;
+            List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+            if (inputs != null) {
+                nInputPartitions = new int[inputs.size()];
+                for (int i = 0; i < nInputPartitions.length; ++i) {
+                    nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+                            .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                }
+            }
+            int[] nOutputPartitions = null;
+            List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
+                    anId);
+            if (outputs != null) {
+                nOutputPartitions = new int[outputs.size()];
+                for (int i = 0; i < nOutputPartitions.length; ++i) {
+                    nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+                            .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                }
+            }
+            ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
+            activityPartsMap.put(anId, apd);
         }
         return activityPartsMap;
     }
 
     private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
-        Map<ActivityNodeId, Integer> pcMap = computePartitionCounts(ac);
-        Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
+        Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
 
         for (ActivityNodeId anId : ac.getActivities()) {
-            int nParts = pcMap.get(anId);
-            TaskState[] taskStates = new TaskState[nParts];
-            for (int i = 0; i < nParts; ++i) {
-                taskStates[i] = new TaskState(new TaskId(anId, i));
+            ActivityPartitionDetails apd = pcMap.get(anId);
+            Task[] taskStates = new Task[apd.getPartitionCount()];
+            for (int i = 0; i < taskStates.length; ++i) {
+                taskStates[i] = new Task(new TaskId(anId, i), apd);
             }
             taskStateMap.put(anId, taskStates);
         }
@@ -308,8 +344,8 @@
 
         Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
         for (ActivityNodeId anId : activities) {
-            TaskState[] taskStates = taskStateMap.get(anId);
-            for (TaskState ts : taskStates) {
+            Task[] taskStates = taskStateMap.get(anId);
+            for (Task ts : taskStates) {
                 Set<TaskId> cluster = new HashSet<TaskId>();
                 cluster.add(ts.getTaskId());
                 taskClusterMap.put(ts.getTaskId(), cluster);
@@ -320,15 +356,18 @@
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         BitSet targetBitmap = new BitSet();
         for (ActivityNodeId ac1 : activities) {
-            TaskState[] ac1TaskStates = taskStateMap.get(ac1);
+            Task[] ac1TaskStates = taskStateMap.get(ac1);
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
                     IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
+                    if (cPolicy == null) {
+                        cPolicy = new PipelinedConnectorPolicy();
+                    }
                     ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
-                    TaskState[] ac2TaskStates = taskStateMap.get(ac2);
+                    Task[] ac2TaskStates = taskStateMap.get(ac2);
                     int nConsumers = ac2TaskStates.length;
                     for (int i = 0; i < nProducers; ++i) {
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -339,7 +378,7 @@
                             connectionInfo.put(ac1TaskStates[i].getTaskId(), cInfoList);
                         }
                         Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j)) {
+                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
                             cInfoList.add(new Pair<TaskId, IConnectorPolicy>(ac2TaskStates[j].getTaskId(), cPolicy));
                             if (cPolicy.requiresProducerConsumerCoscheduling()) {
                                 cluster.add(ac2TaskStates[j].getTaskId());
@@ -378,11 +417,11 @@
         Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
         Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
         for (Set<TaskId> cluster : clusters) {
-            Set<TaskState> taskStates = new HashSet<TaskState>();
+            Set<Task> taskStates = new HashSet<Task>();
             for (TaskId tid : cluster) {
                 taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
             }
-            TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new TaskState[taskStates.size()]));
+            TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
             tcSet.add(tc);
             for (TaskId tid : cluster) {
                 taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
@@ -391,12 +430,12 @@
         ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
 
         for (TaskCluster tc : tcSet) {
-            for (TaskState ts : tc.getTasks()) {
+            for (Task ts : tc.getTasks()) {
                 TaskId tid = ts.getTaskId();
                 List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(tid);
                 if (cInfoList != null) {
                     for (Pair<TaskId, IConnectorPolicy> p : cInfoList) {
-                        TaskState targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+                        Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
                         TaskCluster targetTC = targetTS.getTaskCluster();
                         if (targetTC != tc) {
                             targetTC.getDependencies().add(tc);
@@ -411,6 +450,14 @@
 
         computeBlockerClosure(tcSet);
         computeDependencyClosure(tcSet);
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Plan for " + ac);
+            LOGGER.info("Built " + tcSet.size() + " Task Clusters");
+            for (TaskCluster tc : tcSet) {
+                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+            }
+        }
     }
 
     private void computeDependencyClosure(Set<TaskCluster> tcSet) {
@@ -454,4 +501,23 @@
             }
         }
     }
+
+    @Override
+    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
+        for (ActivityCluster ac2 : inProgressClusters) {
+            abortActivityCluster(ac2);
+        }
+        jobRun.setStatus(JobStatus.FAILURE, exception);
+    }
+
+    private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
+        ac.getStateMachine().abort();
+    }
+
+    @Override
+    public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
+        completedClusters.add(ac);
+        inProgressClusters.remove(ac);
+        startRunnableActivityClusters();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
index 2c4a4ae..02adfa5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
@@ -16,11 +16,16 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
 
 public interface IActivityClusterStateMachine {
     public void schedule() throws HyracksException;
 
+    public void abort() throws HyracksException;
+
     public void notifyTaskComplete(TaskAttempt ta) throws HyracksException;
 
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
+
+    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
index fcf0a79..97ca9fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
@@ -15,7 +15,12 @@
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 
 public interface IJobRunStateMachine {
     public void schedule() throws HyracksException;
+
+    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
+
+    public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
index 1a26e60..a0510f9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
@@ -27,7 +27,7 @@
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
 public class AdminConsoleHandler extends AbstractHandler {
     private ClusterControllerService ccs;
@@ -51,7 +51,7 @@
         writer.println("<h2>Node Controllers</h2>");
         writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
         try {
-            ccs.getJobQueue().scheduleAndSync(new SynchronizableRunnable() {
+            ccs.getJobQueue().scheduleAndSync(new SynchronizableEvent() {
                 @Override
                 protected void doRun() throws Exception {
                     for (Map.Entry<String, NodeControllerState> e : ccs.getNodeMap().entrySet()) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
index 65c156c..e047212 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -28,7 +28,7 @@
 import org.eclipse.jetty.server.handler.AbstractHandler;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 
 public class ApplicationInstallationHandler extends AbstractHandler {
@@ -54,7 +54,7 @@
             }
             final String appName = parts[0];
             if (HttpMethods.PUT.equals(request.getMethod())) {
-                class OutputStreamGetter extends SynchronizableRunnable {
+                class OutputStreamGetter extends SynchronizableEvent {
                     private OutputStream os;
 
                     @Override
@@ -78,7 +78,7 @@
                     r.os.close();
                 }
             } else if (HttpMethods.GET.equals(request.getMethod())) {
-                class InputStreamGetter extends SynchronizableRunnable {
+                class InputStreamGetter extends SynchronizableEvent {
                     private InputStream is;
 
                     @Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index f0ceed6..4043baa 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -16,13 +16,12 @@
 
 import java.rmi.Remote;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController extends Remote {
     public String getId() throws Exception;
@@ -31,8 +30,8 @@
 
     public NodeCapability getNodeCapability() throws Exception;
 
-    public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptId> tasks,
-            Map<OperatorDescriptorId, Integer> opNumPartitions) throws Exception;
+    public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors)
+            throws Exception;
 
     public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
new file mode 100644
index 0000000..15ba75c
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class TaskAttemptDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final TaskAttemptId taId;
+
+    private final int nPartitions;
+
+    private final int[] nInputPartitions;
+
+    private final int[] nOutputPartitions;
+
+    public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+        this.taId = taId;
+        this.nPartitions = nPartitions;
+        this.nInputPartitions = nInputPartitions;
+        this.nOutputPartitions = nOutputPartitions;
+    }
+
+    public TaskAttemptId getTaskAttemptId() {
+        return taId;
+    }
+
+    public int getPartitionCount() {
+        return nPartitions;
+    }
+
+    public int[] getInputPartitionCounts() {
+        return nInputPartitions;
+    }
+
+    public int[] getOutputPartitionCounts() {
+        return nOutputPartitions;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6599eb0..072c2ec 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -90,13 +90,13 @@
         }
         Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
         if (!opEnvMap.containsKey(partition)) {
-            opEnvMap.put(partition, new OperatorEnvironmentImpl());
+            opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
         }
         return opEnvMap.get(partition);
     }
 
     public void addTask(Task task) {
-        taskMap.put(task.getTaskId(), task);
+        taskMap.put(task.getTaskAttemptId(), task);
     }
 
     public Map<TaskAttemptId, Task> getTaskMap() {
@@ -104,9 +104,11 @@
     }
 
     private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+        private final String nodeId;
         private final Map<String, Object> map;
 
-        public OperatorEnvironmentImpl() {
+        public OperatorEnvironmentImpl(String nodeId) {
+            this.nodeId = nodeId;
             map = new HashMap<String, Object>();
         }
 
@@ -119,6 +121,10 @@
         public void set(String name, Object value) {
             map.put(name, value);
         }
+
+        public String toString() {
+            return super.toString() + "@" + nodeId;
+        }
     }
 
     public Executor getExecutor() {
@@ -127,14 +133,14 @@
 
     public synchronized void notifyTaskComplete(Task task) throws Exception {
         taskMap.remove(task);
-        TaskProfile taskProfile = new TaskProfile(task.getTaskId());
+        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
         task.dumpProfile(taskProfile);
-        nodeController.notifyTaskComplete(jobId, task.getTaskId(), taskProfile);
+        nodeController.notifyTaskComplete(jobId, task.getTaskAttemptId(), taskProfile);
     }
 
-    public synchronized void notifyStageletFailed(Task task, Exception exception) throws Exception {
+    public synchronized void notifyTaskFailed(Task task, Exception exception) {
         taskMap.remove(task);
-        nodeController.notifyTaskFailed(jobId, task.getTaskId(), exception);
+        nodeController.notifyTaskFailed(jobId, task.getTaskAttemptId(), exception);
     }
 
     public NodeControllerService getNodeController() {
@@ -147,9 +153,9 @@
             counters.put(e.getKey(), e.getValue().get());
         }
         for (Task task : taskMap.values()) {
-            TaskProfile taskProfile = new TaskProfile(task.getTaskId());
+            TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
             task.dumpProfile(taskProfile);
-            jProfile.getTaskProfiles().put(task.getTaskId(), taskProfile);
+            jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
         }
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index bdd30a9..d05db7d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -58,6 +58,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -73,6 +74,7 @@
 import edu.uci.ics.hyracks.control.common.base.NodeCapability;
 import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -81,6 +83,7 @@
 import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
 import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -215,8 +218,8 @@
         return InetAddress.getByAddress(ipBytes);
     }
 
-    public void startTasks(String appName, UUID jobId, byte[] jagBytes, List<TaskAttemptId> tasks,
-            Map<OperatorDescriptorId, Integer> opNumPartitions) throws Exception {
+    public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
+            List<TaskAttemptDescriptor> taskDescriptors) throws Exception {
         try {
             NCApplicationContext appCtx = applications.get(appName);
             final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
@@ -235,56 +238,53 @@
 
             final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
 
-            for (TaskAttemptId tid : tasks) {
-                IActivityNode han = plan.getActivityNodeMap().get(tid.getTaskId().getActivityId());
-                if (LOGGER.isLoggable(Level.FINEST)) {
-                    LOGGER.finest("Initializing " + tid + " -> " + han);
+            for (TaskAttemptDescriptor td : taskDescriptors) {
+                TaskAttemptId taId = td.getTaskAttemptId();
+                TaskId tid = taId.getTaskId();
+                IActivityNode han = plan.getActivityNodeMap().get(tid.getActivityId());
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 IOperatorDescriptor op = han.getOwner();
-                int partition = tid.getTaskId().getPartition();
-                Task task = new Task(joblet, tid);
+                final int partition = tid.getPartition();
+                Task task = new Task(joblet, taId, han.getClass().getName());
                 IOperatorNodePushable operator = han.createPushRuntime(task, joblet.getEnvironment(op, partition), rdp,
-                        partition, opNumPartitions.get(op.getOperatorId()));
+                        partition, td.getPartitionCount());
 
                 IPartitionCollector collector = null;
 
-                List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getTaskId()
-                        .getActivityId());
+                List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         if (i >= 1) {
                             throw new HyracksException("Multiple inputs to an activity not currently supported");
                         }
-                        if (!inputs.isEmpty()) {
-                            IConnectorDescriptor conn = inputs.get(0);
-                            OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                                    .getOperatorId();
-                            OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                                    .getOperatorId();
-                            RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                            collector = conn.createPartitionCollector(task, recordDesc, partition,
-                                    opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
+                        IConnectorDescriptor conn = inputs.get(i);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
+                        RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+                        collector = conn.createPartitionCollector(task, recordDesc, partition,
+                                td.getInputPartitionCounts()[i], td.getPartitionCount());
                     }
                 }
-                List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getTaskId()
-                        .getActivityId());
+                List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
                 if (outputs != null) {
                     for (int i = 0; i < outputs.size(); ++i) {
-                        IConnectorDescriptor conn = outputs.get(i);
-                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                                .getOperatorId();
-                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                                .getOperatorId();
+                        final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
                         IPartitionWriterFactory pwFactory = new IPartitionWriterFactory() {
                             @Override
                             public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                                return null;
+                                return new PipelinedPartition(partitionManager, new PartitionId(jobId,
+                                        conn.getConnectorId(), partition, receiverIndex));
                             }
                         };
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+                        }
                         IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
-                                opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
@@ -334,12 +334,11 @@
         }
     }
 
-    public void notifyTaskFailed(UUID jobId, TaskAttemptId taskId, Exception exception) throws Exception {
+    public void notifyTaskFailed(UUID jobId, TaskAttemptId taskId, Exception exception) {
         try {
             ccs.notifyTaskFailure(jobId, taskId, id, exception);
         } catch (Exception e) {
             e.printStackTrace();
-            throw e;
         }
     }
 
@@ -467,8 +466,10 @@
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
         Joblet ji = jobletMap.get(pid.getJobId());
-        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
-                new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
-        ji.reportPartitionAvailability(channel);
+        if (ji != null) {
+            PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
+                    new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
+            ji.reportPartitionAvailability(channel);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index a39d1b7..8f30e39 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -42,7 +42,9 @@
 public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     private final Joblet joblet;
 
-    private final TaskAttemptId taskId;
+    private final TaskAttemptId taskAttemptId;
+
+    private final String displayName;
 
     private final IWorkspaceFileFactory fileFactory;
 
@@ -56,9 +58,10 @@
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId) {
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName) {
         this.joblet = joblet;
-        this.taskId = taskId;
+        this.taskAttemptId = taskId;
+        this.displayName = displayName;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
@@ -109,8 +112,8 @@
     }
 
     @Override
-    public TaskAttemptId getTaskId() {
-        return taskId;
+    public TaskAttemptId getTaskAttemptId() {
+        return taskAttemptId;
     }
 
     @Override
@@ -142,12 +145,17 @@
 
     public void abort() {
         aborted = true;
-        collector.abort();
+        if (collector != null) {
+            collector.abort();
+        }
     }
 
     @Override
     public void run() {
+        Thread ct = Thread.currentThread();
+        String threadName = ct.getName();
         try {
+            ct.setName(displayName + ": " + taskAttemptId);
             operator.initialize();
             try {
                 if (collector != null) {
@@ -185,8 +193,12 @@
             } finally {
                 operator.deinitialize();
             }
+            joblet.notifyTaskComplete(this);
         } catch (Exception e) {
-
+            e.printStackTrace();
+            joblet.notifyTaskFailed(this, e);
+        } finally {
+            ct.setName(threadName);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index fbddabd..c3a4e20 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -18,7 +18,6 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.SelectionKey;
@@ -26,8 +25,10 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -42,7 +43,7 @@
 public class ConnectionManager {
     private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
 
-    private static final int INITIAL_MESSAGE_SIZE = 40;
+    static final int INITIAL_MESSAGE_SIZE = 40;
 
     private final IHyracksRootContext ctx;
 
@@ -95,6 +96,7 @@
     private final class ConnectionListenerThread extends Thread {
         public ConnectionListenerThread() {
             super("Hyracks NC Connection Listener");
+            setDaemon(true);
         }
 
         @Override
@@ -105,7 +107,9 @@
                     dataListener.addIncomingConnection(sc);
                 } catch (AsynchronousCloseException e) {
                     // do nothing
-                    e.printStackTrace();
+                    if (!stopped) {
+                        e.printStackTrace();
+                    }
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
@@ -117,19 +121,20 @@
         private Selector selector;
 
         private final List<SocketChannel> pendingIncomingConnections;
-        private final List<SocketChannel> pendingNegotiations;
+        private final Set<SocketChannel> pendingNegotiations;
         private final List<INetworkChannel> pendingOutgoingConnections;
         private final List<INetworkChannel> pendingAbortConnections;
 
         public DataListenerThread() {
             super("Hyracks Data Listener Thread");
+            setDaemon(true);
             try {
                 selector = Selector.open();
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
             pendingIncomingConnections = new ArrayList<SocketChannel>();
-            pendingNegotiations = new ArrayList<SocketChannel>();
+            pendingNegotiations = new HashSet<SocketChannel>();
             pendingOutgoingConnections = new ArrayList<INetworkChannel>();
             pendingAbortConnections = new ArrayList<INetworkChannel>();
         }
@@ -170,16 +175,12 @@
                         }
                         if (!pendingOutgoingConnections.isEmpty()) {
                             for (INetworkChannel nc : pendingOutgoingConnections) {
-                                SocketAddress rAddr = nc.getRemoteAddress();
                                 SocketChannel sc = SocketChannel.open();
                                 sc.configureBlocking(false);
-                                int interestOps = SelectionKey.OP_READ;
-                                if (!sc.connect(rAddr)) {
-                                    interestOps |= SelectionKey.OP_CONNECT;
-                                }
-                                SelectionKey scKey = sc.register(selector, interestOps);
+                                SelectionKey scKey = sc.register(selector, 0);
                                 scKey.attach(nc);
                                 nc.setSelectionKey(scKey);
+                                nc.notifyConnectionManagerRegistration();
                             }
                             pendingOutgoingConnections.clear();
                         }
@@ -207,6 +208,7 @@
                                         buffer.flip();
                                         if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
                                             PartitionId pid = readInitialMessage(buffer);
+                                            pendingNegotiations.remove(sc);
                                             key.interestOps(0);
                                             NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
                                             channel.setSelectionKey(key);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 98215ee..61cd91f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -14,4 +14,6 @@
     public SocketAddress getRemoteAddress();
 
     public void abort();
+
+    public void notifyConnectionManagerRegistration() throws IOException;
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 24a6412..dcf530e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -21,11 +21,13 @@
 import java.nio.channels.SocketChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -58,6 +60,8 @@
 
     private Object attachment;
 
+    private ByteBuffer writeBuffer;
+
     public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
             SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
         this.connectionManager = connectionManager;
@@ -94,11 +98,16 @@
 
     @Override
     public synchronized void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
         emptyQueue.add(buffer);
         if (!eos && !aborted) {
             int ops = key.interestOps();
             if ((ops & SelectionKey.OP_READ) == 0) {
                 key.interestOps(ops | SelectionKey.OP_READ);
+                key.selector().wakeup();
+                if (currentBuffer == null) {
+                    currentBuffer = emptyQueue.poll();
+                }
             }
         }
     }
@@ -125,7 +134,17 @@
             monitor.notifyEndOfStream(this);
             return true;
         }
-        if (key.isReadable()) {
+        if (key.isConnectable()) {
+            if (socketChannel.finishConnect()) {
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+                prepareForWrite();
+            }
+        } else if (key.isWritable()) {
+            socketChannel.write(writeBuffer);
+            if (writeBuffer.remaining() == 0) {
+                key.interestOps(SelectionKey.OP_READ);
+            }
+        } else if (key.isReadable()) {
             if (LOGGER.isLoggable(Level.FINER)) {
                 LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
             }
@@ -144,6 +163,11 @@
                 if (LOGGER.isLoggable(Level.FINEST)) {
                     LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
                 }
+                if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
+                    eos = true;
+                    monitor.notifyEndOfStream(this);
+                    return true;
+                }
                 fullQueue.add(currentBuffer);
                 currentBuffer = emptyQueue.poll();
                 if (currentBuffer == null && key.isValid()) {
@@ -158,6 +182,26 @@
         return false;
     }
 
+    private void prepareForConnect() {
+        key.interestOps(SelectionKey.OP_CONNECT);
+    }
+
+    private void prepareForWrite() {
+        writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
+        writeUUID(writeBuffer, partitionId.getJobId());
+        writeUUID(writeBuffer, partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getSenderIndex());
+        writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.flip();
+
+        key.interestOps(SelectionKey.OP_WRITE);
+    }
+
+    private void writeUUID(ByteBuffer buffer, UUID uuid) {
+        buffer.putLong(uuid.getMostSignificantBits());
+        buffer.putLong(uuid.getLeastSignificantBits());
+    }
+
     @Override
     public void setSelectionKey(SelectionKey key) {
         this.key = key;
@@ -185,4 +229,13 @@
     public boolean aborted() {
         return aborted;
     }
+
+    @Override
+    public void notifyConnectionManagerRegistration() throws IOException {
+        if (socketChannel.connect(remoteAddress)) {
+            prepareForWrite();
+        } else {
+            prepareForConnect();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index eb1ec89..74542b3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -149,10 +149,8 @@
         synchronized (this) {
             fullQueue.add(destBuffer);
         }
-        int interestOps = key.interestOps();
-        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
-            key.interestOps(interestOps | SelectionKey.OP_WRITE);
-        }
+        key.interestOps(SelectionKey.OP_WRITE);
+        key.selector().wakeup();
     }
 
     @Override
@@ -163,5 +161,11 @@
     @Override
     public synchronized void close() throws HyracksDataException {
         eos = true;
+        key.interestOps(SelectionKey.OP_WRITE);
+        key.selector().wakeup();        
+    }
+
+    @Override
+    public void notifyConnectionManagerRegistration() throws IOException {
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java
deleted file mode 100644
index 0977002..0000000
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.common.comm;
-
-import java.util.Collection;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-
-public class SortMergePartitionCollector extends AbstractPartitionCollector {
-    private final FrameTuplePairComparator tpc;
-
-    private final FrameTupleAppender appender;
-
-    private final RecordDescriptor recordDescriptor;
-
-    private final int maxFramesLimit;
-
-    private IInputChannel[] channels;
-
-    public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, int maxFramesLimit,
-            int nSenders) {
-        super(ctx, connectorId, receiverIndex);
-        tpc = new FrameTuplePairComparator(sortFields, sortFields, comparators);
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        this.recordDescriptor = recordDescriptor;
-        this.maxFramesLimit = maxFramesLimit;
-        channels = new IInputChannel[nSenders];
-    }
-
-    @Override
-    public void open() throws HyracksException {
-    }
-
-    @Override
-    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
-
-    }
-
-    @Override
-    public IFrameReader getReader() throws HyracksException {
-        return null;
-    }
-
-    @Override
-    public void close() throws HyracksException {
-
-    }
-
-    @Override
-    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
-        return null;
-    }
-
-    @Override
-    public void abort() {
-
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
similarity index 87%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
index 4326a9f..fbc6fc4 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.common.comm;
+package edu.uci.ics.hyracks.dataflow.std.collectors;
 
 import java.util.UUID;
 
@@ -21,21 +21,21 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 
 public abstract class AbstractPartitionCollector implements IPartitionCollector {
-    protected final IHyracksTaskContext stageletContext;
+    protected final IHyracksTaskContext ctx;
 
     protected final ConnectorDescriptorId connectorId;
 
     protected final int receiverIndex;
 
     public AbstractPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex) {
-        this.stageletContext = ctx;
+        this.ctx = ctx;
         this.connectorId = connectorId;
         this.receiverIndex = receiverIndex;
     }
 
     @Override
     public UUID getJobId() {
-        return stageletContext.getJobletContext().getJobId();
+        return ctx.getJobletContext().getJobId();
     }
 
     @Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
similarity index 81%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index fb6a883..b640142 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.common.comm;
+package edu.uci.ics.hyracks.dataflow.std.collectors;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,31 +35,33 @@
 
     private final BitSet expectedPartitions;
 
-    private IInputChannel[] channels;
+    private final int nSenderPartitions;
 
-    private BitSet frameAvailability;
+    private final IInputChannel[] channels;
 
-    private int[] availableFrameCounts;
+    private final BitSet frameAvailability;
 
-    private BitSet eosSenders;
+    private final int[] availableFrameCounts;
+
+    private final BitSet eosSenders;
 
     private BitSet closedSenders;
 
     private int lastReadSender;
 
     public NonDeterministicPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId,
-            int receiverIndex, BitSet expectedPartitions) {
+            int receiverIndex, int nSenderPartitions, BitSet expectedPartitions) {
         super(ctx, connectorId, receiverIndex);
         this.expectedPartitions = expectedPartitions;
-        int nSenders = expectedPartitions.size();
+        this.nSenderPartitions = nSenderPartitions;
         reader = new FrameReader();
-        channels = new IInputChannel[nSenders];
-        eosSenders = new BitSet(nSenders);
-        closedSenders = new BitSet(nSenders);
+        channels = new IInputChannel[nSenderPartitions];
+        eosSenders = new BitSet(nSenderPartitions);
+        closedSenders = new BitSet(nSenderPartitions);
         closedSenders.or(expectedPartitions);
-        closedSenders.flip(0, nSenders);
-        frameAvailability = new BitSet(nSenders);
-        availableFrameCounts = new int[nSenders];
+        closedSenders.flip(0, nSenderPartitions);
+        frameAvailability = new BitSet(nSenderPartitions);
+        availableFrameCounts = new int[nSenderPartitions];
     }
 
     @Override
@@ -68,13 +70,15 @@
     }
 
     @Override
-    public synchronized void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
         for (PartitionChannel pc : partitions) {
             PartitionId pid = pc.getPartitionId();
             IInputChannel channel = pc.getInputChannel();
             channel.setAttachment(pid);
             channel.registerMonitor(reader);
-            channels[pid.getSenderIndex()] = channel;
+            synchronized (this) {
+                channels[pid.getSenderIndex()] = channel;
+            }
             channel.open();
         }
     }
@@ -99,7 +103,7 @@
                 while (true) {
                     switch (lastReadSender) {
                         default:
-                            lastReadSender = frameAvailability.nextSetBit(lastReadSender);
+                            lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
                             if (lastReadSender >= 0) {
                                 break;
                             }
@@ -121,13 +125,14 @@
                         eosSenders.clear(i);
                         closedSenders.set(i);
                     }
-                    if (closedSenders.nextClearBit(0) < 0) {
+                    int nextClosedBitIndex = closedSenders.nextClearBit(0);
+                    if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
                         return false;
                     }
                     try {
                         NonDeterministicPartitionCollector.this.wait();
                     } catch (InterruptedException e) {
-                        e.printStackTrace();
+                        throw new HyracksDataException(e);
                     }
                 }
             }
@@ -136,9 +141,11 @@
         @Override
         public void close() throws HyracksDataException {
             synchronized (NonDeterministicPartitionCollector.this) {
-                for (int i = closedSenders.nextClearBit(0); i >= 0; i = closedSenders.nextClearBit(i)) {
+                for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
+                        .nextClearBit(i + 1)) {
                     if (channels[i] != null) {
                         channels[i].close();
+                        channels[i] = null;
                     }
                 }
             }
@@ -169,7 +176,7 @@
     @Override
     public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
         Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
-        for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i)) {
+        for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
             c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
         }
         return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
new file mode 100644
index 0000000..70355c7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SortMergePartitionCollector extends AbstractPartitionCollector {
+    private final int[] sortFields;
+
+    private final IBinaryComparator[] comparators;
+
+    private final RecordDescriptor recordDescriptor;
+
+    private final int maxConcurrentMerges;
+
+    private final IInputChannel[] channels;
+
+    private final int nSenders;
+
+    private final boolean stable;
+
+    private final FrameReader frameReader;
+
+    private final PartitionBatchManager pbm;
+
+    public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
+            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor,
+            int maxConcurrentMerges, int nSenders, boolean stable) {
+        super(ctx, connectorId, receiverIndex);
+        this.sortFields = sortFields;
+        this.comparators = comparators;
+        this.recordDescriptor = recordDescriptor;
+        this.maxConcurrentMerges = maxConcurrentMerges;
+        channels = new IInputChannel[nSenders];
+        this.nSenders = nSenders;
+        this.stable = stable;
+        this.frameReader = new FrameReader();
+        pbm = new NonDeterministicPartitionBatchManager();
+    }
+
+    @Override
+    public void open() throws HyracksException {
+    }
+
+    @Override
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+        for (PartitionChannel pc : partitions) {
+            PartitionId pid = pc.getPartitionId();
+            IInputChannel channel = pc.getInputChannel();
+            InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+            channel.registerMonitor(channelReader);
+            channel.setAttachment(channelReader);
+            synchronized (this) {
+                channels[pid.getSenderIndex()] = channel;
+            }
+            pbm.addPartition(pid.getSenderIndex());
+            channel.open();
+        }
+    }
+
+    @Override
+    public IFrameReader getReader() throws HyracksException {
+        return frameReader;
+    }
+
+    @Override
+    public void close() throws HyracksException {
+
+    }
+
+    @Override
+    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+        Collection<PartitionId> requiredPartitionIds = new ArrayList<PartitionId>();
+        for (int i = 0; i < nSenders; ++i) {
+            requiredPartitionIds.add(new PartitionId(getJobId(), getConnectorId(), i, receiverIndex));
+        }
+        return requiredPartitionIds;
+    }
+
+    @Override
+    public void abort() {
+
+    }
+
+    private abstract class PartitionBatchManager {
+        protected abstract void addPartition(int index);
+
+        protected abstract void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
+    }
+
+    private class NonDeterministicPartitionBatchManager extends PartitionBatchManager {
+        private List<IFrameReader> partitions;
+
+        private List<IFrameReader> batch;
+
+        private int requiredSize;
+
+        public NonDeterministicPartitionBatchManager() {
+            partitions = new ArrayList<IFrameReader>();
+        }
+
+        @Override
+        protected void addPartition(int index) {
+            synchronized (SortMergePartitionCollector.this) {
+                if (batch != null && batch.size() < requiredSize) {
+                    batch.add((IFrameReader) channels[index].getAttachment());
+                    if (batch.size() == requiredSize) {
+                        SortMergePartitionCollector.this.notifyAll();
+                    }
+                } else {
+                    partitions.add((IFrameReader) channels[index].getAttachment());
+                }
+            }
+        }
+
+        @Override
+        protected void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
+            synchronized (SortMergePartitionCollector.this) {
+                if (partitions.size() <= size) {
+                    batch.addAll(partitions);
+                    partitions.clear();
+                } else if (partitions.size() > size) {
+                    List<IFrameReader> sublist = partitions.subList(0, size);
+                    batch.addAll(sublist);
+                    sublist.clear();
+                }
+                if (batch.size() == size) {
+                    return;
+                }
+                this.batch = batch;
+                this.requiredSize = size;
+                while (batch.size() < size) {
+                    try {
+                        SortMergePartitionCollector.this.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                this.batch = null;
+            }
+        }
+    }
+
+    private static class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
+        private final IInputChannel channel;
+
+        private int availableFrames;
+
+        private boolean eos;
+
+        public InputChannelFrameReader(IInputChannel channel) {
+            this.channel = channel;
+            availableFrames = 0;
+            eos = false;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+        }
+
+        @Override
+        public synchronized boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            while (!eos && availableFrames <= 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            if (eos) {
+                return false;
+            }
+            ByteBuffer srcBuffer = channel.getNextBuffer();
+            --availableFrames;
+            FrameUtils.copy(srcBuffer, buffer);
+            channel.recycleBuffer(srcBuffer);
+            return true;
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+
+        }
+
+        @Override
+        public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+            availableFrames += nFrames;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyEndOfStream(IInputChannel channel) {
+            eos = true;
+            notifyAll();
+        }
+    }
+
+    private class FrameReader implements IFrameReader {
+        private RunMergingFrameReader merger;
+
+        @Override
+        public void open() throws HyracksDataException {
+            if (maxConcurrentMerges >= nSenders) {
+                List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+                for (int i = 0; i < nSenders; ++i) {
+                    inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+                }
+                List<IFrameReader> batch = new ArrayList<IFrameReader>();
+                pbm.getNextBatch(batch, nSenders);
+                merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames,
+                        sortFields, comparators, recordDescriptor);
+            } else {
+                // multi level merge.
+                throw new HyracksDataException("Not yet supported");
+            }
+            merger.open();
+        }
+
+        @Override
+        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            buffer.position(buffer.capacity());
+            buffer.limit(buffer.capacity());
+            return merger.nextFrame(buffer);
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            merger.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 6037393..12935af 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -24,8 +24,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,6 +50,7 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+                expectedPartitions);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index a8bcfa5..54979f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -24,8 +24,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.SortMergePartitionCollector;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergePartitionCollector;
 
 public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -33,13 +33,20 @@
     private final ITuplePartitionComputerFactory tpcf;
     private final int[] sortFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean stable;
 
     public MToNPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
             int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+        this(spec, tpcf, sortFields, comparatorFactories, false);
+    }
+
+    public MToNPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
+            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, boolean stable) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
         this.comparatorFactories = comparatorFactories;
+        this.stable = stable;
     }
 
     @Override
@@ -59,6 +66,6 @@
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         return new SortMergePartitionCollector(ctx, getConnectorId(), index, sortFields, comparators, recordDesc,
-                nProducerPartitions, nProducerPartitions);
+                nProducerPartitions, nProducerPartitions, stable);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 957734c..5297a99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -24,8 +24,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
 
 public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     public MToNReplicatingConnectorDescriptor(JobSpecification spec) {
@@ -82,6 +82,7 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+                expectedPartitions);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a15a64f..556466f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -28,8 +28,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
 
 public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,7 +50,8 @@
             int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
-        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+        return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+                expectedPartitions);
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 6f46932..a2f366b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -110,6 +110,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
+                    System.err.println("close%%%%%%%%%%%%%%%%%%%%%%%%%" + joiner + " " + env);
                     env.set(JOINER, joiner);
                 }
 
@@ -138,6 +139,7 @@
                 @Override
                 public void open() throws HyracksDataException {
                     joiner = (InMemoryHashJoin) env.get(JOINER);
+                    System.err.println("%%%%%%%%%%%%%%%%%%%%%%%%%" + joiner + " " + env);
                     writer.open();
                 }
 
@@ -148,6 +150,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
+                    System.err.println("^^^^^^^^^^^^^^^^^^^^^^^^" + joiner);
                     joiner.closeJoin(writer);
                     writer.close();
                     env.set(JOINER, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index cf46c0c..34a8e5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -46,6 +46,7 @@
         appender.reset(writeBuffer, true);
         if (fieldSlots != null && tupleData != null && tupleSize > 0)
             appender.append(fieldSlots, tupleData, 0, tupleSize);
+        writer.open();
         FrameUtils.flushFrame(writeBuffer, writer);
         writer.close();
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index c860b2b..6871452 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -17,10 +17,12 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -28,7 +30,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -131,10 +132,14 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    List<RunFileReader> runs = (List<RunFileReader>) env.get(RUNS);
+                    List<IFrameReader> runs = (List<IFrameReader>) env.get(RUNS);
                     FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                    for (int i = 0; i < comparatorFactories.length; ++i) {
+                        comparators[i] = comparatorFactories[i].createBinaryComparator();
+                    }
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
-                            comparatorFactories, recordDescriptors[0], framesLimit, writer);
+                            comparators, recordDescriptors[0], framesLimit, writer);
                     merger.process();
                     env.set(FRAMESORTER, null);
                     env.set(RUNS, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index 742a5f9..eee6f49 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -18,6 +18,7 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -25,13 +26,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 
 public class ExternalSortRunGenerator implements IFrameWriter {
     private final IHyracksTaskContext ctx;
     private final FrameSorter frameSorter;
-    private final List<RunFileReader> runs;
+    private final List<IFrameReader> runs;
     private final int maxSortFrames;
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
@@ -39,7 +39,7 @@
             RecordDescriptor recordDesc, int framesLimit) {
         this.ctx = ctx;
         frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
-        runs = new LinkedList<RunFileReader>();
+        runs = new LinkedList<IFrameReader>();
         maxSortFrames = framesLimit - 1;
     }
 
@@ -91,7 +91,7 @@
         return frameSorter;
     }
 
-    public List<RunFileReader> getRuns() {
+    public List<IFrameReader> getRuns() {
         return runs;
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 096adaa..8b8429c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -16,29 +16,24 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class ExternalSortRunMerger {
     private final IHyracksTaskContext ctx;
     private final FrameSorter frameSorter;
-    private final List<RunFileReader> runs;
+    private final List<IFrameReader> runs;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
     private final RecordDescriptor recordDesc;
@@ -48,26 +43,21 @@
     private ByteBuffer outFrame;
     private FrameTupleAppender outFrameAppender;
 
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<RunFileReader> runs,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc,
-            int framesLimit, IFrameWriter writer) {
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
+            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
+            IFrameWriter writer) {
         this.ctx = ctx;
         this.frameSorter = frameSorter;
         this.runs = runs;
         this.sortFields = sortFields;
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
+        this.comparators = comparators;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
     }
 
-    public void process(boolean doFinalPass) throws HyracksDataException {
-        if (doFinalPass) {
-            writer.open();
-        }
+    public void process() throws HyracksDataException {
+        writer.open();
         try {
             if (runs.size() <= 0) {
                 if (frameSorter != null && frameSorter.getFrameCount() > 0) {
@@ -81,36 +71,25 @@
                 for (int i = 0; i < framesLimit - 1; ++i) {
                     inFrames.add(ctx.allocateFrame());
                 }
-                int passCount = 0;
                 while (runs.size() > 0) {
-                    passCount++;
                     try {
-                        doPass(runs, passCount, doFinalPass);
+                        doPass(runs);
                     } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
                 }
             }
         } finally {
-            if (doFinalPass) {
-                writer.close();
-            }
+            writer.close();
         }
     }
 
-    public void process() throws HyracksDataException {
-        process(true);
-    }
-
     // creates a new run from runs that can fit in memory.
-    private void doPass(List<RunFileReader> runs, int passCount, boolean doFinalPass) throws HyracksDataException {
+    private void doPass(List<IFrameReader> runs) throws HyracksDataException {
         FileReference newRun = null;
         IFrameWriter writer = this.writer;
         boolean finalPass = false;
         if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
-            if (!doFinalPass) {
-                return;
-            }
             finalPass = true;
             for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
                 inFrames.remove(i);
@@ -121,46 +100,19 @@
             writer.open();
         }
         try {
-            RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
-            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-            Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc,
-                    inFrames.size(), comparator);
-            int[] tupleIndexes = new int[inFrames.size()];
+            IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
             for (int i = 0; i < inFrames.size(); i++) {
-                tupleIndexes[i] = 0;
-                int runIndex = topTuples.peek().getRunid();
-                runCursors[runIndex] = runs.get(runIndex);
-                runCursors[runIndex].open();
-                if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
-                    tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-                    tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                    setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                } else {
-                    closeRun(runIndex, runCursors, tupleAccessors);
-                }
+                runCursors[i] = runs.get(i);
             }
-
-            while (!topTuples.areRunsExhausted()) {
-                ReferenceEntry top = topTuples.peek();
-                int runIndex = top.getRunid();
-                FrameTupleAccessor fta = top.getAccessor();
-                int tupleIndex = top.getTupleIndex();
-
-                if (!outFrameAppender.append(fta, tupleIndex)) {
+            RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+                    comparators, recordDesc);
+            merger.open();
+            try {
+                while (merger.nextFrame(outFrame)) {
                     FrameUtils.flushFrame(outFrame, writer);
-                    outFrameAppender.reset(outFrame, true);
-                    if (!outFrameAppender.append(fta, tupleIndex)) {
-                        throw new IllegalStateException();
-                    }
                 }
-
-                ++tupleIndexes[runIndex];
-                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-            }
-            if (outFrameAppender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outFrame, writer);
-                outFrameAppender.reset(outFrame, true);
+            } finally {
+                merger.close();
             }
             runs.subList(0, inFrames.size()).clear();
             if (!finalPass) {
@@ -172,66 +124,4 @@
             }
         }
     }
-
-    private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
-        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-        if (exists) {
-            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
-        } else {
-            topTuples.pop();
-            closeRun(runIndex, runCursors, tupleAccessors);
-        }
-    }
-
-    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
-        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
-            return false;
-        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-            ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-            if (runCursors[runIndex].nextFrame(buf)) {
-                tupleIndexes[runIndex] = 0;
-                return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-            } else {
-                return false;
-            }
-        } else {
-            return true;
-        }
-    }
-
-    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-            throws HyracksDataException {
-        runCursors[index].close();
-        runCursors[index] = null;
-        tupleAccessor[index] = null;
-    }
-
-    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
-        return new Comparator<ReferenceEntry>() {
-            public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                int j1 = tp1.getTupleIndex();
-                int j2 = tp2.getTupleIndex();
-                byte[] b1 = fta1.getBuffer().array();
-                byte[] b2 = fta2.getBuffer().array();
-                for (int f = 0; f < sortFields.length; ++f) {
-                    int fIdx = sortFields[f];
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                    if (c != 0) {
-                        return c;
-                    }
-                }
-                return 0;
-            }
-        };
-    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 603b194..c583d04 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -136,7 +136,6 @@
         FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
         ByteBuffer outFrame = ctx.allocateFrame();
-        writer.open();
         appender.reset(outFrame, true);
         int n = tPointers.length / 4;
         for (int ptr = 0; ptr < n; ++ptr) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
new file mode 100644
index 0000000..ddd24d8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class RunMergingFrameReader implements IFrameReader {
+    private final IHyracksTaskContext ctx;
+
+    private final IFrameReader[] runCursors;
+
+    private final List<ByteBuffer> inFrames;
+
+    private final int[] sortFields;
+
+    private final IBinaryComparator[] comparators;
+
+    private final RecordDescriptor recordDesc;
+
+    private final FrameTupleAppender outFrameAppender;
+
+    private ReferencedPriorityQueue topTuples;
+
+    private int[] tupleIndexes;
+
+    private FrameTupleAccessor[] tupleAccessors;
+
+    public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
+            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc) {
+        this.ctx = ctx;
+        this.runCursors = runCursors;
+        this.inFrames = inFrames;
+        this.sortFields = sortFields;
+        this.comparators = comparators;
+        this.recordDesc = recordDesc;
+        outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, inFrames.size(), comparator);
+        tupleIndexes = new int[inFrames.size()];
+        for (int i = 0; i < inFrames.size(); i++) {
+            tupleIndexes[i] = 0;
+            int runIndex = topTuples.peek().getRunid();
+            runCursors[runIndex].open();
+            if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+            } else {
+                closeRun(runIndex, runCursors, tupleAccessors);
+            }
+        }
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        outFrameAppender.reset(buffer, true);
+        while (!topTuples.areRunsExhausted()) {
+            ReferenceEntry top = topTuples.peek();
+            int runIndex = top.getRunid();
+            FrameTupleAccessor fta = top.getAccessor();
+            int tupleIndex = top.getTupleIndex();
+
+            if (!outFrameAppender.append(fta, tupleIndex)) {
+                return true;
+            }
+
+            ++tupleIndexes[runIndex];
+            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+        }
+
+        if (outFrameAppender.getTupleCount() > 0) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < inFrames.size(); ++i) {
+            closeRun(i, runCursors, tupleAccessors);
+        }
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+        if (exists) {
+            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+            return false;
+        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+            ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+            if (runCursors[runIndex].nextFrame(buf)) {
+                tupleIndexes[runIndex] = 0;
+                return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+            } else {
+                return false;
+            }
+        } else {
+            return true;
+        }
+    }
+
+    private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+            throws HyracksDataException {
+        if (runCursors[index] != null) {
+            runCursors[index].close();
+            runCursors[index] = null;
+            tupleAccessors[index] = null;
+        }
+    }
+
+    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceEntry>() {
+            public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+                int j1 = tp1.getTupleIndex();
+                int j2 = tp2.getTupleIndex();
+                byte[] b1 = fta1.getBuffer().array();
+                byte[] b2 = fta2.getBuffer().array();
+                for (int f = 0; f < sortFields.length; ++f) {
+                    int fIdx = sortFields[f];
+                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                            + fta1.getFieldStartOffset(j1, fIdx);
+                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                            + fta2.getFieldStartOffset(j2, fIdx);
+                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+                return 0;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index b2b75b9..a84d793 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -42,7 +42,7 @@
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.port = 39001;
-        ccConfig.profileDumpPeriod = 1000;
+        ccConfig.profileDumpPeriod = 10000;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index ec6da88..ade333e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -51,6 +51,7 @@
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
         accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
         writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+        writer.open();
         try {
             btreeOpHelper.init();
             btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 819dea6..84be67a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -87,9 +87,10 @@
         cursorFrame = opDesc.getLeafFactory().getFrame();
         cursor = new RangeSearchCursor(cursorFrame);
 
+        writer.open();
         try {
-
             btreeOpHelper.init();
+
             btree = btreeOpHelper.getBTree();
 
             // construct range predicate
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index c6f6e5b..37e7bd6 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -62,6 +62,7 @@
         builderDos = builder.getDataOutput();
         appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(writeBuffer, true);
+        writer.open();
     }
 
     @Override
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index c2693ab..a9bba5a 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -279,6 +279,39 @@
         }
     }
 
+    private String dumpState() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("Buffer cache state\n");
+        buffer.append("Page Size: ").append(pageSize).append('\n');
+        buffer.append("Number of physical pages: ").append(numPages).append('\n');
+        buffer.append("Hash table size: ").append(pageMap.length).append('\n');
+        buffer.append("Page Map:\n");
+        int nCachedPages = 0;
+        for (int i = 0; i < pageMap.length; ++i) {
+            CacheBucket cb = pageMap[i];
+            cb.bucketLock.lock();
+            try {
+                CachedPage cp = cb.cachedPage;
+                if (cp != null) {
+                    buffer.append("   ").append(i).append('\n');
+                    while (cp != null) {
+                        buffer.append("      ").append(cp.cpid).append(" -> [")
+                                .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+                                .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+                                .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                                .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+                        cp = cp.next;
+                        ++nCachedPages;
+                    }
+                }
+            } finally {
+                cb.bucketLock.unlock();
+            }
+        }
+        buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
+        return buffer.toString();
+    }
+
     private void read(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
         cPage.buffer.clear();
@@ -568,6 +601,7 @@
     public void closeFile(int fileId) throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Closing file: " + fileId + " in cache: " + this);
+            LOGGER.info(dumpState());
         }
         synchronized (fileInfoMap) {
             BufferedFileHandle fInfo = fileInfoMap.get(fileId);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 589352a..cbab080 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -85,7 +85,7 @@
     }
 
     @Override
-    public TaskAttemptId getTaskId() {
+    public TaskAttemptId getTaskAttemptId() {
         return taskId;
     }
 }
\ No newline at end of file