Added more connector policies. Cleaned up Activity API.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@431 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
similarity index 73%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 12f76fe..5a58b7b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -15,14 +15,13 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 import java.io.Serializable;
-import java.util.UUID;
 
-public final class ActivityNodeId implements Serializable {
+public final class ActivityId implements Serializable {
     private static final long serialVersionUID = 1L;
     private final OperatorDescriptorId odId;
-    private final UUID id;
+    private final long id;
 
-    public ActivityNodeId(OperatorDescriptorId odId, UUID id) {
+    public ActivityId(OperatorDescriptorId odId, long id) {
         this.odId = odId;
         this.id = id;
     }
@@ -31,13 +30,13 @@
         return odId;
     }
 
-    public UUID getLocalId() {
+    public long getLocalId() {
         return id;
     }
 
     @Override
     public int hashCode() {
-        return odId.hashCode() + id.hashCode();
+        return (int) (odId.hashCode() + id);
     }
 
     @Override
@@ -45,14 +44,14 @@
         if (this == o) {
             return true;
         }
-        if (!(o instanceof ActivityNodeId)) {
+        if (!(o instanceof ActivityId)) {
             return false;
         }
-        ActivityNodeId other = (ActivityNodeId) o;
-        return other.odId.equals(odId) && other.id.equals(id);
+        ActivityId other = (ActivityId) o;
+        return other.odId.equals(odId) && other.id == id;
     }
 
     public String toString() {
-        return "ANID:" + id;
+        return "ANID:[" + odId + ":" + id + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
similarity index 88%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
index 9a37661..6b519d6 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
@@ -21,10 +21,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 
-public interface IActivityNode extends Serializable {
-    public ActivityNodeId getActivityNodeId();
-
-    public IOperatorDescriptor getOwner();
+public interface IActivity extends Serializable {
+    public ActivityId getActivityId();
 
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
index c6bc5e3..56870b2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -15,11 +15,11 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 public interface IActivityGraphBuilder {
-    public void addTask(IActivityNode task);
+    public void addActivity(IActivity task);
 
-    public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked);
+    public void addBlockingEdge(IActivity blocker, IActivity blocked);
 
-    public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex);
+    public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex);
 
-    public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex);
+    public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex);
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index b838a80..3731459 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -63,7 +63,7 @@
      * @param builder
      *            - graph builder
      */
-    public void contributeTaskGraph(IActivityGraphBuilder builder);
+    public void contributeActivities(IActivityGraphBuilder builder);
 
     /**
      * Contributes any scheduling constraints imposed by this operator.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
deleted file mode 100644
index 844266d..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataflow;
-
-import java.io.Serializable;
-
-public final class PortInstanceId implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private OperatorDescriptorId odId;
-    private Direction direction;
-    private int portIndex;
-    private int partition;
-
-    public PortInstanceId(OperatorDescriptorId odId, Direction direction, int portIndex, int partition) {
-        this.odId = odId;
-        this.direction = direction;
-        this.portIndex = portIndex;
-        this.partition = partition;
-    }
-
-    public OperatorDescriptorId getOperatorId() {
-        return odId;
-    }
-
-    public Direction getDirection() {
-        return direction;
-    }
-
-    public int getPortIndex() {
-        return portIndex;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((direction == null) ? 0 : direction.hashCode());
-        result = prime * result + ((odId == null) ? 0 : odId.hashCode());
-        result = prime * result + partition;
-        result = prime * result + portIndex;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        PortInstanceId other = (PortInstanceId) obj;
-        if (direction == null) {
-            if (other.direction != null)
-                return false;
-        } else if (!direction.equals(other.direction))
-            return false;
-        if (odId == null) {
-            if (other.odId != null)
-                return false;
-        } else if (!odId.equals(other.odId))
-            return false;
-        if (partition != other.partition)
-            return false;
-        if (portIndex != other.portIndex)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return odId + ":" + direction + ":" + partition + ":" + portIndex;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 9283b8e..4420279 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -19,16 +19,16 @@
 public final class TaskId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final ActivityNodeId activityId;
+    private final ActivityId activityId;
 
     private final int partition;
 
-    public TaskId(ActivityNodeId activityId, int partition) {
+    public TaskId(ActivityId activityId, int partition) {
         this.activityId = activityId;
         this.partition = partition;
     }
 
-    public ActivityNodeId getActivityId() {
+    public ActivityId getActivityId() {
         return activityId;
     }
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
similarity index 84%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
index da659ce..d6d6fee 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
 
 import java.io.Serializable;
 
@@ -20,4 +20,8 @@
     public boolean requiresProducerConsumerCoscheduling();
 
     public boolean consumerWaitsForProducerToFinish();
+
+    public boolean materializeOnSendSide();
+
+    public boolean materializeOnReceiveSide();
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
similarity index 66%
copy from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
index da659ce..ca684e4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
@@ -12,12 +12,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
 
 import java.io.Serializable;
 
-public interface IConnectorPolicy extends Serializable {
-    public boolean requiresProducerConsumerCoscheduling();
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 
-    public boolean consumerWaitsForProducerToFinish();
+public interface IConnectorPolicyAssignmentPolicy extends Serializable {
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts);
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
similarity index 80%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
index bc98e93..ba9d3a5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
 
 public final class PipelinedConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
@@ -26,4 +26,14 @@
     public boolean consumerWaitsForProducerToFinish() {
         return true;
     }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
similarity index 80%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
index 235cd3c..ffd3722 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
 
 public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
@@ -26,4 +26,14 @@
     public boolean consumerWaitsForProducerToFinish() {
         return true;
     }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
similarity index 72%
copy from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
index 235cd3c..0a02e58 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
@@ -12,9 +12,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -26,4 +26,14 @@
     public boolean consumerWaitsForProducerToFinish() {
         return true;
     }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index d2a4f0f..f369f95 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,9 +22,9 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -40,34 +40,34 @@
 
     private final EnumSet<JobFlag> jobFlags;
 
-    private final Map<ActivityNodeId, IActivityNode> activityNodes;
+    private final Map<ActivityId, IActivity> activityNodes;
 
-    private final Map<ActivityNodeId, Set<ActivityNodeId>> blocker2blockedMap;
+    private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
 
-    private final Map<ActivityNodeId, Set<ActivityNodeId>> blocked2blockerMap;
+    private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
 
-    private final Map<OperatorDescriptorId, Set<ActivityNodeId>> operatorActivityMap;
+    private final Map<OperatorDescriptorId, Set<ActivityId>> operatorActivityMap;
 
-    private final Map<ActivityNodeId, List<Integer>> activityInputMap;
+    private final Map<ActivityId, List<Integer>> activityInputMap;
 
-    private final Map<ActivityNodeId, List<Integer>> activityOutputMap;
+    private final Map<ActivityId, List<Integer>> activityOutputMap;
 
-    private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorInputMap;
+    private final Map<OperatorDescriptorId, List<ActivityId>> operatorInputMap;
 
-    private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorOutputMap;
+    private final Map<OperatorDescriptorId, List<ActivityId>> operatorOutputMap;
 
     public JobActivityGraph(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
         this.appName = appName;
         this.jobSpec = jobSpec;
         this.jobFlags = jobFlags;
-        activityNodes = new HashMap<ActivityNodeId, IActivityNode>();
-        blocker2blockedMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
-        blocked2blockerMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
-        operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityNodeId>>();
-        activityInputMap = new HashMap<ActivityNodeId, List<Integer>>();
-        activityOutputMap = new HashMap<ActivityNodeId, List<Integer>>();
-        operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
-        operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
+        activityNodes = new HashMap<ActivityId, IActivity>();
+        blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
+        blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityId>>();
+        activityInputMap = new HashMap<ActivityId, List<Integer>>();
+        activityOutputMap = new HashMap<ActivityId, List<Integer>>();
+        operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
+        operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
     }
 
     public String getApplicationName() {
@@ -82,39 +82,39 @@
         return jobFlags;
     }
 
-    public Map<ActivityNodeId, IActivityNode> getActivityNodeMap() {
+    public Map<ActivityId, IActivity> getActivityNodeMap() {
         return activityNodes;
     }
 
-    public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocker2BlockedMap() {
+    public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
         return blocker2blockedMap;
     }
 
-    public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocked2BlockerMap() {
+    public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
         return blocked2blockerMap;
     }
 
-    public Map<OperatorDescriptorId, Set<ActivityNodeId>> getOperatorActivityMap() {
+    public Map<OperatorDescriptorId, Set<ActivityId>> getOperatorActivityMap() {
         return operatorActivityMap;
     }
 
-    public Map<ActivityNodeId, List<Integer>> getActivityInputMap() {
+    public Map<ActivityId, List<Integer>> getActivityInputMap() {
         return activityInputMap;
     }
 
-    public Map<ActivityNodeId, List<Integer>> getActivityOutputMap() {
+    public Map<ActivityId, List<Integer>> getActivityOutputMap() {
         return activityOutputMap;
     }
 
-    public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorInputMap() {
+    public Map<OperatorDescriptorId, List<ActivityId>> getOperatorInputMap() {
         return operatorInputMap;
     }
 
-    public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorOutputMap() {
+    public Map<OperatorDescriptorId, List<ActivityId>> getOperatorOutputMap() {
         return operatorOutputMap;
     }
 
-    public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityNodeId hanId) {
+    public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityId hanId) {
         List<Integer> inputIndexes = activityInputMap.get(hanId);
         if (inputIndexes == null) {
             return null;
@@ -127,7 +127,7 @@
         return inputs;
     }
 
-    public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityNodeId hanId) {
+    public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityId hanId) {
         List<Integer> outputIndexes = activityOutputMap.get(hanId);
         if (outputIndexes == null) {
             return null;
@@ -140,14 +140,14 @@
         return outputs;
     }
 
-    public ActivityNodeId getConsumerActivity(ConnectorDescriptorId cdId) {
+    public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
                 .getConnectorOperatorMap().get(cdId);
 
         OperatorDescriptorId consumerOpId = connEdge.second.first.getOperatorId();
         int consumerInputIdx = connEdge.second.second;
 
-        for (ActivityNodeId anId : operatorActivityMap.get(consumerOpId)) {
+        for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
             List<Integer> anInputs = activityInputMap.get(anId);
             if (anInputs != null) {
                 for (Integer idx : anInputs) {
@@ -160,14 +160,14 @@
         return null;
     }
 
-    public ActivityNodeId getProducerActivity(ConnectorDescriptorId cdId) {
+    public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
                 .getConnectorOperatorMap().get(cdId);
 
         OperatorDescriptorId producerOpId = connEdge.first.first.getOperatorId();
         int producerInputIdx = connEdge.first.second;
 
-        for (ActivityNodeId anId : operatorActivityMap.get(producerOpId)) {
+        for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
             List<Integer> anOutputs = activityOutputMap.get(anId);
             if (anOutputs != null) {
                 for (Integer idx : anOutputs) {
@@ -180,12 +180,12 @@
         return null;
     }
 
-    public RecordDescriptor getActivityInputRecordDescriptor(ActivityNodeId hanId, int inputIndex) {
+    public RecordDescriptor getActivityInputRecordDescriptor(ActivityId hanId, int inputIndex) {
         int opInputIndex = getActivityInputMap().get(hanId).get(inputIndex);
         return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
     }
 
-    public RecordDescriptor getActivityOutputRecordDescriptor(ActivityNodeId hanId, int outputIndex) {
+    public RecordDescriptor getActivityOutputRecordDescriptor(ActivityId hanId, int outputIndex) {
         int opOutputIndex = getActivityOutputMap().get(hanId).get(outputIndex);
         return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index ab5397d..79d10ab 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.util.Pair;
 
@@ -53,6 +54,8 @@
 
     private final Set<Constraint> userConstraints;
 
+    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
     private int maxAttempts;
 
     public JobSpecification() {
@@ -173,6 +176,14 @@
         return roots;
     }
 
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return connectorPolicyAssignmentPolicy;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+    }
+
     public void setMaxAttempts(int maxAttempts) {
         this.maxAttempts = maxAttempts;
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
similarity index 63%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
index c884998..135a77c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
@@ -12,9 +12,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
 
-public enum Direction {
-    INPUT,
-    OUTPUT,
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISliver {
+    public void open() throws HyracksDataException;
+
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+    public void commit() throws HyracksDataException;
+
+    public void abort();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
similarity index 78%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
index c884998..1269e35 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
@@ -12,9 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
 
-public enum Direction {
-    INPUT,
-    OUTPUT,
+import java.io.Serializable;
+
+public interface IThingDescriptor extends Serializable {
+    public ThingDescriptorId getThingId();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
similarity index 84%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
index c884998..cb79b6f 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
@@ -12,9 +12,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
 
-public enum Direction {
-    INPUT,
-    OUTPUT,
+public interface IThingPartition {
+    public ISliver createSliver();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
new file mode 100644
index 0000000..9adf482
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.things;
+
+import java.io.Serializable;
+
+public final class ThingDescriptorId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long id;
+
+    public ThingDescriptorId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public String toString() {
+        return "TID: " + id;
+    }
+
+    public int hashCode() {
+        return (int) (id & 0xffffffff);
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof ThingDescriptorId)) {
+            return false;
+        }
+        return id == ((ThingDescriptorId) o).id;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index bf4dde1..c802eb8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -19,38 +19,42 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public class ActivityCluster {
     private final JobRun jobRun;
 
-    private final Set<ActivityNodeId> activities;
+    private final Set<ActivityId> activities;
 
     private final Set<ActivityCluster> dependencies;
 
     private final Set<ActivityCluster> dependents;
 
-    private final Map<ActivityNodeId, Task[]> taskStateMap;
+    private final Map<ActivityId, Task[]> taskStateMap;
 
     private TaskCluster[] taskClusters;
 
+    private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
     private IActivityClusterStateMachine acsm;
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
 
-    public ActivityCluster(JobRun jobRun, Set<ActivityNodeId> activities) {
+    public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
         this.jobRun = jobRun;
         this.activities = activities;
         dependencies = new HashSet<ActivityCluster>();
         dependents = new HashSet<ActivityCluster>();
-        taskStateMap = new HashMap<ActivityNodeId, Task[]>();
+        taskStateMap = new HashMap<ActivityId, Task[]>();
+        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
     }
 
-    public Set<ActivityNodeId> getActivities() {
+    public Set<ActivityId> getActivities() {
         return activities;
     }
 
@@ -66,7 +70,7 @@
         return dependencies;
     }
 
-    public Map<ActivityNodeId, Task[]> getTaskMap() {
+    public Map<ActivityId, Task[]> getTaskMap() {
         return taskStateMap;
     }
 
@@ -78,6 +82,10 @@
         this.taskClusters = taskClusters;
     }
 
+    public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
+        return partitionProducingTaskClusterMap;
+    }
+
     public IActivityClusterStateMachine getStateMachine() {
         return acsm;
     }
@@ -91,7 +99,7 @@
     }
 
     public int getMaxTaskClusterAttempts() {
-        return 1;
+        return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
     }
 
     public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
index 9f9a2e3..6effd84 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
@@ -9,8 +9,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -21,37 +22,38 @@
     private JobActivityGraph jag;
 
     @Override
-    public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
-        addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
-        addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+    public void addBlockingEdge(IActivity blocker, IActivity blocked) {
+        addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityId(), blocked.getActivityId());
+        addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
     }
 
     @Override
-    public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+    public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskInputIndex);
+            LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+                    + operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
         }
-        insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
-        insertIntoIndexedMap(jag.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex,
-                task.getActivityNodeId());
+        insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, operatorInputIndex);
+        insertIntoIndexedMap(jag.getOperatorInputMap(), task.getActivityId().getOperatorDescriptorId(),
+                operatorInputIndex, task.getActivityId());
     }
 
     @Override
-    public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+    public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskOutputIndex);
+            LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+                    + operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
         }
-        insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
-        insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex,
-                task.getActivityNodeId());
+        insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, operatorOutputIndex);
+        insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getActivityId().getOperatorDescriptorId(),
+                operatorOutputIndex, task.getActivityId());
     }
 
     @Override
-    public void addTask(IActivityNode task) {
-        jag.getActivityNodeMap().put(task.getActivityNodeId(), task);
-        addToValueSet(jag.getOperatorActivityMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+    public void addActivity(IActivity task) {
+        ActivityId activityId = task.getActivityId();
+        jag.getActivityNodeMap().put(activityId, task);
+        addToValueSet(jag.getOperatorActivityMap(), activityId.getOperatorDescriptorId(), activityId);
     }
 
     private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index fb7dbf8..359bf2e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -21,7 +21,7 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -42,7 +42,7 @@
 
     private final JobProfile profile;
 
-    private final Map<ActivityNodeId, ActivityCluster> activityClusterMap;
+    private final Map<ActivityId, ActivityCluster> activityClusterMap;
 
     private IJobRunStateMachine jsm;
 
@@ -57,7 +57,7 @@
         partitionRequestorMap = new HashMap<PartitionId, String>();
         participatingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
-        activityClusterMap = new HashMap<ActivityNodeId, ActivityCluster>();
+        activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
     }
 
     public UUID getJobId() {
@@ -116,7 +116,7 @@
         return partitionRequestorMap;
     }
 
-    public Map<ActivityNodeId, ActivityCluster> getActivityClusterMap() {
+    public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
         return activityClusterMap;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index 068ec5f..b82dafd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -20,12 +20,17 @@
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class TaskCluster {
     private final ActivityCluster activityCluster;
 
     private final Task[] tasks;
 
+    private final Set<PartitionId> producedPartitions;
+
+    private final Set<PartitionId> requiredPartitions;
+
     private final Set<TaskCluster> blockers;
 
     private final Set<TaskCluster> dependencies;
@@ -35,6 +40,8 @@
     public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
         this.activityCluster = activityCluster;
         this.tasks = tasks;
+        this.producedPartitions = new HashSet<PartitionId>();
+        this.requiredPartitions = new HashSet<PartitionId>();
         this.blockers = new HashSet<TaskCluster>();
         this.dependencies = new HashSet<TaskCluster>();
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
@@ -48,6 +55,14 @@
         return dependencies;
     }
 
+    public Set<PartitionId> getProducedPartitions() {
+        return producedPartitions;
+    }
+
+    public Set<PartitionId> getRequiredPartitions() {
+        return requiredPartitions;
+    }
+
     public Set<TaskCluster> getBlockers() {
         return blockers;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 1fb4bc3..643cc18 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -18,7 +18,7 @@
 import java.util.Map;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -48,10 +48,10 @@
         JobRun run = ccs.getRunMap().get(jobId);
         if (run != null) {
             TaskId tid = taId.getTaskId();
-            Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+            Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
             ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
             if (ac != null) {
-                Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+                Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
                 Task[] taskStates = taskStateMap.get(tid.getActivityId());
                 if (taskStates != null && taskStates.length > tid.getPartition()) {
                     Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 9e47d8a..603623b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
         PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
             @Override
             public void visit(IOperatorDescriptor op) {
-                op.contributeTaskGraph(builder);
+                op.contributeActivities(builder);
             }
         });
         final JobActivityGraph jag = builder.getActivityGraph();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index e9861d8..530d00a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -46,7 +46,7 @@
         partitionAvailabilityMap.put(pid, networkAddress);
 
         Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
-        String requestor = partitionRequestorMap.get(pid);
+        String requestor = partitionRequestorMap.remove(pid);
         if (requestor != null) {
             NodeControllerState ncs = ccs.getNodeMap().get(requestor);
             if (ncs != null) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index e7c92e7..0055946 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -46,8 +46,6 @@
                 if (run == null) {
                     return;
                 }
-                Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
-                partitionRequestorMap.put(pid, nodeId);
 
                 Map<PartitionId, NetworkAddress> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
                 NetworkAddress networkAddress = partitionAvailabilityMap.get(pid);
@@ -58,6 +56,9 @@
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
+                } else {
+                    Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
+                    partitionRequestorMap.put(pid, nodeId);
                 }
             }
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java
deleted file mode 100644
index 8cffd08..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.control.cc.remote.Accumulator;
-
-public class PortMapMergingAccumulator implements
-        Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
-    Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
-    @Override
-    public void accumulate(Map<PortInstanceId, Endpoint> o) {
-        portMap.putAll(o);
-    }
-
-    @Override
-    public Map<PortInstanceId, Endpoint> getResult() {
-        return portMap;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index c325433..15c2db7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
@@ -46,7 +47,6 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -146,19 +146,25 @@
         inProgressTaskClusters.add(tc);
     }
 
+    private TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
+        List<TaskClusterAttempt> attempts = tc.getAttempts();
+        if (!attempts.isEmpty()) {
+            return attempts.get(attempts.size() - 1);
+        }
+        return null;
+    }
+
     @Override
     public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
         TaskAttemptId taId = ta.getTaskAttemptId();
         TaskCluster tc = ta.getTaskState().getTaskCluster();
-        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-        int lastAttempt = tcAttempts.size() - 1;
-        if (taId.getAttempt() == lastAttempt) {
-            TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
             TaskAttempt.TaskStatus taStatus = ta.getStatus();
             if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
                 ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-                if (tcAttempt.decrementPendingTasksCounter() == 0) {
-                    tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                if (lastAttempt.decrementPendingTasksCounter() == 0) {
+                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
                     inProgressTaskClusters.remove(tc);
                     startRunnableTaskClusters();
                 }
@@ -171,42 +177,19 @@
     }
 
     private void startRunnableTaskClusters() throws HyracksException {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-
+        Set<TaskCluster> runnableTaskClusters = new HashSet<TaskCluster>();
+        findRunnableTaskClusters(runnableTaskClusters);
         Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (TaskCluster tc : taskClusters) {
-            Set<TaskCluster> dependencies = tc.getDependencies();
-            List<TaskClusterAttempt> attempts = tc.getAttempts();
-            if (!attempts.isEmpty()) {
-                TaskClusterAttempt lastAttempt = attempts.get(attempts.size() - 1);
-                if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
-                        || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                    continue;
+        for (TaskCluster tc : runnableTaskClusters) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+                List<TaskClusterAttempt> attempts = tc.getAttempts();
+                LOGGER.info("Attempts so far:" + attempts.size());
+                for (TaskClusterAttempt tcAttempt : attempts) {
+                    LOGGER.info("Status: " + tcAttempt.getStatus());
                 }
             }
-            boolean runnable = true;
-            for (TaskCluster depTC : dependencies) {
-                List<TaskClusterAttempt> tcAttempts = depTC.getAttempts();
-                if (tcAttempts.isEmpty()) {
-                    runnable = false;
-                    break;
-                }
-                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
-                if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
-                    runnable = false;
-                    break;
-                }
-            }
-            if (runnable) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
-                    LOGGER.info("Attempts so far:" + attempts.size());
-                    for (TaskClusterAttempt tcAttempt : attempts) {
-                        LOGGER.info("Status: " + tcAttempt.getStatus());
-                    }
-                }
-                assignTaskLocations(tc, taskAttemptMap);
-            }
+            assignTaskLocations(tc, taskAttemptMap);
         }
 
         if (taskAttemptMap.isEmpty()) {
@@ -219,7 +202,63 @@
         startTasks(taskAttemptMap);
     }
 
-    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) {
+    private void findRunnableTaskClusters(Set<TaskCluster> runnableTaskClusters) {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+
+        for (TaskCluster tc : taskClusters) {
+            Set<TaskCluster> blockers = tc.getBlockers();
+            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+            if (lastAttempt != null
+                    && (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastAttempt
+                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                continue;
+            }
+            boolean runnable = true;
+            for (TaskCluster blockerTC : blockers) {
+                List<TaskClusterAttempt> tcAttempts = blockerTC.getAttempts();
+                if (tcAttempts.isEmpty()) {
+                    runnable = false;
+                    break;
+                }
+                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
+                if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                    runnable = false;
+                    break;
+                }
+            }
+            if (runnable) {
+                runnableTaskClusters.add(tc);
+            }
+        }
+    }
+
+    private void findCascadingAbortTaskClusterAttempts(TaskClusterAttempt abortedTCAttempt,
+            Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts) {
+        boolean changed = true;
+        cascadingAbortTaskClusterAttempts.add(abortedTCAttempt);
+        while (changed) {
+            changed = false;
+            for (TaskCluster tc : ac.getTaskClusters()) {
+                TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+                if (tca != null && tca.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                    boolean abort = false;
+                    for (TaskClusterAttempt catca : cascadingAbortTaskClusterAttempts) {
+                        TaskCluster catc = catca.getTaskCluster();
+                        if (tc.getDependencies().contains(catc)) {
+                            abort = true;
+                            break;
+                        }
+                    }
+                    if (abort) {
+                        changed = cascadingAbortTaskClusterAttempts.add(tca) || changed;
+                    }
+                }
+            }
+        }
+        cascadingAbortTaskClusterAttempts.remove(abortedTCAttempt);
+    }
+
+    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
         Executor executor = ccs.getExecutor();
         JobRun jobRun = ac.getJobRun();
         final UUID jobId = jobRun.getJobId();
@@ -249,7 +288,7 @@
         }
     }
 
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt) throws HyracksException {
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
         for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
             if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
@@ -281,23 +320,26 @@
                 });
             }
         }
-
     }
 
     @Override
     public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
         TaskAttemptId taId = ta.getTaskAttemptId();
         TaskCluster tc = ta.getTaskState().getTaskCluster();
-        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-        int lastAttempt = tcAttempts.size() - 1;
-        if (taId.getAttempt() == lastAttempt) {
-            TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
             TaskAttempt.TaskStatus taStatus = ta.getStatus();
             if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
-                abortTaskCluster(tcAttempt);
-                tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                ac.notifyTaskClusterFailure(tcAttempt, exception);
+                abortTaskCluster(lastAttempt);
+                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
+                Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts = new HashSet<TaskClusterAttempt>();
+                findCascadingAbortTaskClusterAttempts(lastAttempt, cascadingAbortTaskClusterAttempts);
+                for (TaskClusterAttempt tca : cascadingAbortTaskClusterAttempts) {
+                    abortTaskCluster(tca);
+                    tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                }
+                ac.notifyTaskClusterFailure(lastAttempt, exception);
             } else {
                 LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
             }
@@ -307,7 +349,7 @@
     }
 
     @Override
-    public void abort() {
+    public void abort() throws HyracksException {
         TaskCluster[] taskClusters = ac.getTaskClusters();
         for (TaskCluster tc : taskClusters) {
             List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 5737743..a15469a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -29,17 +29,21 @@
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
@@ -50,9 +54,6 @@
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
 
 public class DefaultJobRunStateMachine implements IJobRunStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -87,33 +88,35 @@
         return solver;
     }
 
-    private static Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobActivityGraph jag, JobSpecification spec,
+    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
             Set<ActivityCluster> eqSets) {
-        Map<ActivityNodeId, IActivityNode> activityNodeMap = jag.getActivityNodeMap();
+        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
         for (ActivityCluster eqSet : eqSets) {
-            for (ActivityNodeId t : eqSet.getActivities()) {
-                IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+            for (ActivityId t : eqSet.getActivities()) {
+                IActivity activity = activityNodeMap.get(t);
                 List<Integer> inputList = jag.getActivityInputMap().get(t);
                 if (inputList != null) {
                     for (Integer idx : inputList) {
-                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
+                                .getOperatorDescriptorId(), idx);
                         OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
                         int producerOutputIndex = spec.getProducerOutputIndex(conn);
-                        ActivityNodeId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+                        ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
                         if (!eqSet.getActivities().contains(inTask)) {
-                            return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+                            return new Pair<ActivityId, ActivityId>(t, inTask);
                         }
                     }
                 }
                 List<Integer> outputList = jag.getActivityOutputMap().get(t);
                 if (outputList != null) {
                     for (Integer idx : outputList) {
-                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
+                                .getOperatorDescriptorId(), idx);
                         OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
                         int consumerInputIndex = spec.getConsumerInputIndex(conn);
-                        ActivityNodeId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+                        ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
                         if (!eqSet.getActivities().contains(outTask)) {
-                            return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+                            return new Pair<ActivityId, ActivityId>(t, outTask);
                         }
                     }
                 }
@@ -128,11 +131,11 @@
         /*
          * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
          */
-        Map<ActivityNodeId, ActivityCluster> stageMap = new HashMap<ActivityNodeId, ActivityCluster>();
+        Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
         Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
-        for (Set<ActivityNodeId> taskIds : jag.getOperatorActivityMap().values()) {
-            for (ActivityNodeId taskId : taskIds) {
-                Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+        for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
+            for (ActivityId taskId : taskIds) {
+                Set<ActivityId> eqSet = new HashSet<ActivityId>();
                 eqSet.add(taskId);
                 ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
                 stageMap.put(taskId, stage);
@@ -143,23 +146,23 @@
         boolean changed = true;
         while (changed) {
             changed = false;
-            Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(jag, spec, stages);
+            Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
             if (pair != null) {
                 merge(stageMap, stages, pair.first, pair.second);
                 changed = true;
             }
         }
 
-        ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityNodeId>());
-        Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
+        ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
+        Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
         for (ActivityCluster s : stages) {
             endStage.addDependency(s);
             s.addDependent(endStage);
             Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
-            for (ActivityNodeId t : s.getActivities()) {
-                Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+            for (ActivityId t : s.getActivities()) {
+                Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
                 if (blockedTasks != null) {
-                    for (ActivityNodeId bt : blockedTasks) {
+                    for (ActivityId bt : blockedTasks) {
                         blockedStages.add(stageMap.get(bt));
                     }
                 }
@@ -180,14 +183,14 @@
         return endStage;
     }
 
-    private void merge(Map<ActivityNodeId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityNodeId t1,
-            ActivityNodeId t2) {
+    private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
+            ActivityId t2) {
         ActivityCluster stage1 = eqSetMap.get(t1);
-        Set<ActivityNodeId> s1 = stage1.getActivities();
+        Set<ActivityId> s1 = stage1.getActivities();
         ActivityCluster stage2 = eqSetMap.get(t2);
-        Set<ActivityNodeId> s2 = stage2.getActivities();
+        Set<ActivityId> s2 = stage2.getActivities();
 
-        Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
         mergedSet.addAll(s1);
         mergedSet.addAll(s2);
 
@@ -196,7 +199,7 @@
         ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
         eqSets.add(mergedStage);
 
-        for (ActivityNodeId t : mergedSet) {
+        for (ActivityId t : mergedSet) {
             eqSetMap.put(t, mergedStage);
         }
     }
@@ -275,10 +278,10 @@
         }
     }
 
-    private Map<ActivityNodeId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+    private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
             throws HyracksException {
         Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
-        for (ActivityNodeId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivities()) {
             lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
         }
         solver.solve(lValues);
@@ -298,8 +301,8 @@
             }
             nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
         }
-        Map<ActivityNodeId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityNodeId, ActivityPartitionDetails>();
-        for (ActivityNodeId anId : ac.getActivities()) {
+        Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
+        for (ActivityId anId : ac.getActivities()) {
             int nParts = nPartMap.get(anId.getOperatorDescriptorId());
             int[] nInputPartitions = null;
             List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
@@ -327,61 +330,54 @@
     }
 
     private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
-        Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+        Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
-        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+        Set<ActivityId> activities = ac.getActivities();
 
-        for (ActivityNodeId anId : ac.getActivities()) {
+        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+
+        for (ActivityId anId : activities) {
             ActivityPartitionDetails apd = pcMap.get(anId);
             Task[] taskStates = new Task[apd.getPartitionCount()];
             for (int i = 0; i < taskStates.length; ++i) {
-                taskStates[i] = new Task(new TaskId(anId, i), apd);
+                TaskId tid = new TaskId(anId, i);
+                taskStates[i] = new Task(tid, apd);
+                Set<TaskId> cluster = new HashSet<TaskId>();
+                cluster.add(tid);
+                taskClusterMap.put(tid, cluster);
             }
             taskStateMap.put(anId, taskStates);
         }
+
         Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
         ac.setConnectorPolicyMap(connectorPolicies);
-        
-        Set<ActivityNodeId> activities = ac.getActivities();
 
-        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-        for (ActivityNodeId anId : activities) {
-            Task[] taskStates = taskStateMap.get(anId);
-            for (Task ts : taskStates) {
-                Set<TaskId> cluster = new HashSet<TaskId>();
-                cluster.add(ts.getTaskId());
-                taskClusterMap.put(ts.getTaskId(), cluster);
-            }
-        }
-
-        Map<TaskId, List<Pair<TaskId, IConnectorPolicy>>> connectionInfo = new HashMap<TaskId, List<Pair<TaskId, IConnectorPolicy>>>();
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         BitSet targetBitmap = new BitSet();
-        for (ActivityNodeId ac1 : activities) {
+        for (ActivityId ac1 : activities) {
             Task[] ac1TaskStates = taskStateMap.get(ac1);
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
-                    IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
-                    if (cPolicy == null) {
-                        cPolicy = new SendSideMaterializedConnectorPolicy();
-                    }
-                    ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = taskStateMap.get(ac2);
                     int nConsumers = ac2TaskStates.length;
                     for (int i = 0; i < nProducers; ++i) {
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(ac1TaskStates[i]
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
                                 .getTaskId());
                         if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, IConnectorPolicy>>();
-                            connectionInfo.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
                         }
                         Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
                         for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
-                            cInfoList.add(new Pair<TaskId, IConnectorPolicy>(ac2TaskStates[j].getTaskId(), cPolicy));
+                            cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
+                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
                             if (cPolicy.requiresProducerConsumerCoscheduling()) {
                                 cluster.add(ac2TaskStates[j].getTaskId());
                             }
@@ -434,17 +430,25 @@
         }
         ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
 
-        for (TaskCluster tc : tcSet) {
+        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
+        for (TaskCluster tc : ac.getTaskClusters()) {
             for (Task ts : tc.getTasks()) {
                 TaskId tid = ts.getTaskId();
-                List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(tid);
+                List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
                 if (cInfoList != null) {
-                    for (Pair<TaskId, IConnectorPolicy> p : cInfoList) {
+                    for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
                         Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
                         TaskCluster targetTC = targetTS.getTaskCluster();
                         if (targetTC != tc) {
+                            ConnectorDescriptorId cdId = p.second;
+                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+                                    p.first.getPartition());
+                            tc.getProducedPartitions().add(pid);
+                            targetTC.getRequiredPartitions().add(pid);
+                            partitionProducingTaskClusterMap.put(pid, tc);
+                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
                             targetTC.getDependencies().add(tc);
-                            if (p.second.consumerWaitsForProducerToFinish()) {
+                            if (cPolicy.consumerWaitsForProducerToFinish()) {
                                 targetTC.getBlockers().add(tc);
                             }
                         }
@@ -466,20 +470,20 @@
     }
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityNodeId, ActivityPartitionDetails> pcMap) {
+            Map<ActivityId, ActivityPartitionDetails> pcMap) {
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        Set<ActivityNodeId> activities = ac.getActivities();
-        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+        Set<ActivityId> activities = ac.getActivities();
+        Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
         BitSet targetBitmap = new BitSet();
-        for (ActivityNodeId ac1 : activities) {
+        for (ActivityId ac1 : activities) {
             Task[] ac1TaskStates = taskStateMap.get(ac1);
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = taskStateMap.get(ac2);
                     int nConsumers = ac2TaskStates.length;
 
@@ -497,6 +501,11 @@
     }
 
     private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+        IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+                .getConnectorPolicyAssignmentPolicy();
+        if (cpap != null) {
+            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+        }
         return new PipelinedConnectorPolicy();
     }
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 54b31f8..eade061 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -22,9 +22,9 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public interface INodeController extends Remote {
     public String getId() throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 072c2ec..30434b8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -26,7 +26,6 @@
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -84,11 +83,11 @@
         return jobId;
     }
 
-    public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
-        if (!envMap.containsKey(hod.getOperatorId())) {
-            envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+    public IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
+        if (!envMap.containsKey(opId)) {
+            envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
         }
-        Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
+        Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(opId);
         if (!opEnvMap.containsKey(partition)) {
             opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c7636f8..c4c396f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -53,13 +53,13 @@
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -76,9 +76,6 @@
 import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -89,6 +86,7 @@
 import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -253,14 +251,14 @@
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
-                IActivityNode han = plan.getActivityNodeMap().get(tid.getActivityId());
+                IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
-                IOperatorDescriptor op = han.getOwner();
                 final int partition = tid.getPartition();
                 Task task = new Task(joblet, taId, han.getClass().getName());
-                IOperatorNodePushable operator = han.createPushRuntime(task, joblet.getEnvironment(op, partition), rdp,
+                IOperatorNodePushable operator = han.createPushRuntime(task,
+                        joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
                         partition, td.getPartitionCount());
 
                 IPartitionCollector collector = null;
@@ -272,12 +270,12 @@
                             throw new HyracksException("Multiple inputs to an activity not currently supported");
                         }
                         IConnectorDescriptor conn = inputs.get(i);
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
                         RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                        collector = conn.createPartitionCollector(task, recordDesc, partition,
-                                td.getInputPartitionCounts()[i], td.getPartitionCount());
+                        collector = createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
                     }
                 }
                 List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
@@ -310,17 +308,21 @@
         }
     }
 
+    private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+            int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+            throws HyracksDataException {
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
+        if (cPolicy.materializeOnReceiveSide()) {
+            return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, executor);
+        } else {
+            return collector;
+        }
+    }
+
     private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
             final IConnectorDescriptor conn, final int senderIndex) {
-        if (cPolicy instanceof PipelinedConnectorPolicy) {
-            return new IPartitionWriterFactory() {
-                @Override
-                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
-                            senderIndex, receiverIndex));
-                }
-            };
-        } else if (cPolicy instanceof SendSideMaterializedConnectorPolicy) {
+        if (cPolicy.materializeOnSendSide()) {
             return new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
@@ -328,8 +330,15 @@
                             conn.getConnectorId(), senderIndex, receiverIndex), executor);
                 }
             };
+        } else {
+            return new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
+                            senderIndex, receiverIndex));
+                }
+            };
         }
-        throw new IllegalArgumentException("Unknown connector policy: " + cPolicy);
     }
 
     private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, INCApplicationContext appCtx) throws Exception {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
new file mode 100644
index 0000000..971f2b6
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class MaterializedPartitionInputChannel implements IInputChannel {
+    private final Queue<ByteBuffer> emptyQueue;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final PartitionId pid;
+
+    private final PartitionManager manager;
+
+    private final FrameWriter writer;
+
+    private IInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public MaterializedPartitionInputChannel(IHyracksRootContext ctx, int nBuffers, PartitionId pid,
+            PartitionManager manager) {
+        this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        for (int i = 0; i < nBuffers; ++i) {
+            emptyQueue.add(ctx.allocateFrame());
+        }
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.pid = pid;
+        this.manager = manager;
+        writer = new FrameWriter();
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        synchronized (this) {
+            emptyQueue.add(buffer);
+            notifyAll();
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        MaterializedPartition partition = (MaterializedPartition) manager.getPartition(pid);
+        partition.writeTo(writer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class FrameWriter implements IFrameWriter {
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            synchronized (MaterializedPartitionInputChannel.this) {
+                while (emptyQueue.isEmpty()) {
+                    try {
+                        MaterializedPartitionInputChannel.this.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                ByteBuffer destFrame = emptyQueue.poll();
+                buffer.position(0);
+                buffer.limit(buffer.capacity());
+                destFrame.clear();
+                destFrame.put(buffer);
+                fullQueue.add(destFrame);
+                monitor.notifyDataAvailability(MaterializedPartitionInputChannel.this, 1);
+            }
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
new file mode 100644
index 0000000..5f3a3f4
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+    private final IHyracksRootContext ctx;
+
+    private PartitionManager manager;
+
+    private final IPartitionCollector delegate;
+
+    private final Executor executor;
+
+    public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
+            IPartitionCollector collector, Executor executor) {
+        this.ctx = ctx;
+        this.manager = manager;
+        this.delegate = collector;
+        this.executor = executor;
+    }
+
+    @Override
+    public UUID getJobId() {
+        return delegate.getJobId();
+    }
+
+    @Override
+    public ConnectorDescriptorId getConnectorId() {
+        return delegate.getConnectorId();
+    }
+
+    @Override
+    public int getReceiverIndex() {
+        return delegate.getReceiverIndex();
+    }
+
+    @Override
+    public void open() throws HyracksException {
+        delegate.open();
+    }
+
+    @Override
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+        for (final PartitionChannel pc : partitions) {
+            PartitionWriter writer = new PartitionWriter(pc);
+            executor.execute(writer);
+        }
+    }
+
+    private class PartitionWriter implements Runnable, IInputChannelMonitor {
+        private PartitionChannel pc;
+
+        private int nAvailableFrames;
+
+        private boolean eos;
+
+        public PartitionWriter(PartitionChannel pc) {
+            this.pc = pc;
+            nAvailableFrames = 0;
+            eos = false;
+        }
+
+        @Override
+        public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+            nAvailableFrames += nFrames;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyEndOfStream(IInputChannel channel) {
+            eos = true;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void run() {
+            PartitionId pid = pc.getPartitionId();
+            MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, executor);
+            IInputChannel channel = pc.getInputChannel();
+            try {
+                channel.registerMonitor(this);
+                channel.open();
+                mpw.open();
+                while (true) {
+                    if (nAvailableFrames > 0) {
+                        ByteBuffer buffer = channel.getNextBuffer();
+                        --nAvailableFrames;
+                        mpw.nextFrame(buffer);
+                        channel.recycleBuffer(buffer);
+                    } else if (eos) {
+                        break;
+                    } else {
+                        try {
+                            wait();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                mpw.close();
+                channel.close();
+                delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
+                        new MaterializedPartitionInputChannel(ctx, 5, pid, manager))));
+            } catch (HyracksException e) {
+            }
+        }
+    }
+
+    @Override
+    public IFrameReader getReader() throws HyracksException {
+        return delegate.getReader();
+    }
+
+    @Override
+    public void close() throws HyracksException {
+        delegate.close();
+    }
+
+    @Override
+    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+        return delegate.getRequiredPartitionIds();
+    }
+
+    @Override
+    public void abort() {
+        delegate.abort();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
index 4976230..11a02b3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
@@ -14,22 +14,20 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.base;
 
-import java.util.UUID;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-
-public abstract class AbstractActivityNode implements IActivityNode {
+public abstract class AbstractActivityNode implements IActivity {
     private static final long serialVersionUID = 1L;
 
-    protected final ActivityNodeId id;
+    protected final ActivityId id;
 
-    public AbstractActivityNode() {
-        this.id = new ActivityNodeId(getOwner().getOperatorId(), UUID.randomUUID());
+    public AbstractActivityNode(ActivityId id) {
+        this.id = id;
     }
 
     @Override
-    public ActivityNodeId getActivityNodeId() {
+    public ActivityId getActivityId() {
         return id;
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
index 018b7b7..c4c3326 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
@@ -14,38 +14,29 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.base;
 
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
-public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements
-        IActivityNode {
+public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements IActivity {
     private static final long serialVersionUID = 1L;
 
-    protected final ActivityNodeId activityNodeId;
+    protected final ActivityId activityNodeId;
 
     public AbstractSingleActivityOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
         super(spec, inputArity, outputArity);
-        activityNodeId = new ActivityNodeId(odId, UUID.randomUUID());
+        activityNodeId = new ActivityId(odId, 0);
     }
 
     @Override
-    public ActivityNodeId getActivityNodeId() {
+    public ActivityId getActivityId() {
         return activityNodeId;
     }
 
     @Override
-    public final IOperatorDescriptor getOwner() {
-        return this;
-    }
-
-    @Override
-    public final void contributeTaskGraph(IActivityGraphBuilder builder) {
-        builder.addTask(this);
+    public final void contributeActivities(IActivityGraphBuilder builder) {
+        builder.addActivity(this);
         for (int i = 0; i < getInputArity(); ++i) {
             builder.addSourceEdge(i, this, i);
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
index 4257083..6d3d891 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
@@ -25,8 +25,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -175,14 +175,14 @@
      * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
      */
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
-        MergeActivity mergeAct = new MergeActivity();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        PartialAggregateActivity partialAggAct = new PartialAggregateActivity(new ActivityId(odId, 0));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, 1));
 
-        builder.addTask(partialAggAct);
+        builder.addActivity(partialAggAct);
         builder.addSourceEdge(0, partialAggAct, 0);
 
-        builder.addTask(mergeAct);
+        builder.addActivity(mergeAct);
         builder.addTargetEdge(0, mergeAct, 0);
 
         // FIXME Block or not?
@@ -191,12 +191,12 @@
     }
 
     private class PartialAggregateActivity extends AbstractActivityNode {
-
-        /**
-         * 
-         */
         private static final long serialVersionUID = 1L;
 
+        public PartialAggregateActivity(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -302,21 +302,15 @@
 
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalHashGroupOperatorDescriptor.this;
-        }
-
     }
 
     private class MergeActivity extends AbstractActivityNode {
-
-        /**
-         * 
-         */
         private static final long serialVersionUID = 1L;
 
+        public MergeActivity(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -654,11 +648,6 @@
             return op;
         }
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalHashGroupOperatorDescriptor.this;
-        }
-
         private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
             return new Comparator<ReferenceEntry>() {
                 public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 8d92c54..cf954f6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -57,12 +57,12 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity();
-        builder.addTask(ha);
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, 0));
+        builder.addActivity(ha);
 
-        OutputActivity oa = new OutputActivity();
-        builder.addTask(oa);
+        OutputActivity oa = new OutputActivity(new ActivityId(odId, 1));
+        builder.addActivity(oa);
 
         builder.addSourceEdge(0, ha, 0);
         builder.addTargetEdge(0, oa, 0);
@@ -72,6 +72,10 @@
     private class HashBuildActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public HashBuildActivity(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -106,16 +110,15 @@
                 }
             };
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return HashGroupOperatorDescriptor.this;
-        }
     }
 
     private class OutputActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public OutputActivity(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -130,10 +133,5 @@
                 }
             };
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return HashGroupOperatorDescriptor.this;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 1e77547..27ae2db 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -73,18 +73,20 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
-        JoinActivityNode join = new JoinActivityNode();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION, keys0,
+                0);
+        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), LARGERELATION, keys1,
+                1);
+        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
 
-        builder.addTask(rpart);
+        builder.addActivity(rpart);
         builder.addSourceEdge(0, rpart, 0);
 
-        builder.addTask(spart);
+        builder.addActivity(spart);
         builder.addSourceEdge(1, spart, 0);
 
-        builder.addTask(join);
+        builder.addActivity(join);
         builder.addBlockingEdge(rpart, spart);
         builder.addBlockingEdge(spart, join);
 
@@ -101,7 +103,8 @@
         private int operatorInputIndex;
         private int keys[];
 
-        public HashPartitionActivityNode(String partitionsKey, int keys[], int operatorInputIndex) {
+        public HashPartitionActivityNode(ActivityId id, String partitionsKey, int keys[], int operatorInputIndex) {
+            super(id);
             this.partitionsKey = partitionsKey;
             this.keys = keys;
             this.operatorInputIndex = operatorInputIndex;
@@ -196,16 +199,15 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return GraceHashJoinOperatorDescriptor.this;
-        }
     }
 
     private class JoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public JoinActivityNode(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
@@ -280,10 +282,5 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return GraceHashJoinOperatorDescriptor.this;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index faa7afb..df7440b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -92,14 +92,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), LARGERELATION);
 
-        builder.addTask(phase1);
+        builder.addActivity(phase1);
         builder.addSourceEdge(0, phase1, 0);
 
-        builder.addTask(phase2);
+        builder.addActivity(phase2);
         builder.addSourceEdge(1, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
@@ -111,10 +111,9 @@
         private static final long serialVersionUID = 1L;
         private String relationName;
 
-        public BuildAndPartitionActivityNode(String relationName) {
-            super();
+        public BuildAndPartitionActivityNode(ActivityId id, String relationName) {
+            super(id);
             this.relationName = relationName;
-
         }
 
         @Override
@@ -288,19 +287,14 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return HybridHashJoinOperatorDescriptor.this;
-        }
     }
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
         private String largeRelation;
 
-        public PartitionAndJoinActivityNode(String relationName) {
-            super();
+        public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
+            super(id);
             this.largeRelation = relationName;
         }
 
@@ -492,10 +486,5 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return HybridHashJoinOperatorDescriptor.this;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 6f46932..b4797e8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,14 +61,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashBuildActivityNode hba = new HashBuildActivityNode();
-        HashProbeActivityNode hpa = new HashProbeActivityNode();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        HashBuildActivityNode hba = new HashBuildActivityNode(new ActivityId(odId, 0));
+        HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
 
-        builder.addTask(hba);
+        builder.addActivity(hba);
         builder.addSourceEdge(0, hba, 0);
 
-        builder.addTask(hpa);
+        builder.addActivity(hpa);
         builder.addSourceEdge(1, hpa, 0);
         builder.addTargetEdge(0, hpa, 0);
 
@@ -78,6 +78,10 @@
     private class HashBuildActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public HashBuildActivityNode(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -119,16 +123,15 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return InMemoryHashJoinOperatorDescriptor.this;
-        }
     }
 
     private class HashProbeActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public HashProbeActivityNode(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -160,10 +163,5 @@
             };
             return op;
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return InMemoryHashJoinOperatorDescriptor.this;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index f4fb240..e8a6eae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -43,14 +43,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        MaterializerActivityNode ma = new MaterializerActivityNode();
-        ReaderActivityNode ra = new ReaderActivityNode();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, 0));
+        ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, 1));
 
-        builder.addTask(ma);
+        builder.addActivity(ma);
         builder.addSourceEdge(0, ma, 0);
 
-        builder.addTask(ra);
+        builder.addActivity(ra);
         builder.addTargetEdge(0, ra, 0);
 
         builder.addBlockingEdge(ma, ra);
@@ -59,6 +59,10 @@
     private final class MaterializerActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public MaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -89,16 +93,15 @@
                 }
             };
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return MaterializingOperatorDescriptor.this;
-        }
     }
 
     private final class ReaderActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
+        public ReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -125,10 +128,5 @@
                 }
             };
         }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return MaterializingOperatorDescriptor.this;
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index a12e49e..8a3addb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -18,10 +18,9 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -37,14 +36,13 @@
     private class CollectActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public ActivityNodeId getActivityNodeId() {
-            return id;
+        public CollectActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
-        public IOperatorDescriptor getOwner() {
-            return SplitVectorOperatorDescriptor.this;
+        public ActivityId getActivityId() {
+            return id;
         }
 
         @Override
@@ -82,9 +80,8 @@
     private class SplitActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return SplitVectorOperatorDescriptor.this;
+        public SplitActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
@@ -139,14 +136,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        CollectActivity ca = new CollectActivity();
-        SplitActivity sa = new SplitActivity();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        CollectActivity ca = new CollectActivity(new ActivityId(odId, 0));
+        SplitActivity sa = new SplitActivity(new ActivityId(odId, 1));
 
-        builder.addTask(ca);
+        builder.addActivity(ca);
         builder.addSourceEdge(0, ca, 0);
 
-        builder.addTask(sa);
+        builder.addActivity(sa);
         builder.addTargetEdge(0, sa, 0);
 
         builder.addBlockingEdge(ca, sa);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 6871452..3bde4c4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -19,8 +19,8 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -65,14 +65,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity();
-        MergeActivity ma = new MergeActivity();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SortActivity sa = new SortActivity(new ActivityId(odId, 0));
+        MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
 
-        builder.addTask(sa);
+        builder.addActivity(sa);
         builder.addSourceEdge(0, sa, 0);
 
-        builder.addTask(ma);
+        builder.addActivity(ma);
         builder.addTargetEdge(0, ma, 0);
 
         builder.addBlockingEdge(sa, ma);
@@ -81,9 +81,8 @@
     private class SortActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalSortOperatorDescriptor.this;
+        public SortActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
@@ -121,9 +120,8 @@
     private class MergeActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalSortOperatorDescriptor.this;
+        public MergeActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index afa27ba..d3015b7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -17,8 +17,8 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -56,14 +56,14 @@
     }
 
     @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity();
-        MergeActivity ma = new MergeActivity();
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SortActivity sa = new SortActivity(new ActivityId(odId, 0));
+        MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
 
-        builder.addTask(sa);
+        builder.addActivity(sa);
         builder.addSourceEdge(0, sa, 0);
 
-        builder.addTask(ma);
+        builder.addActivity(ma);
         builder.addTargetEdge(0, ma, 0);
 
         builder.addBlockingEdge(sa, ma);
@@ -72,9 +72,8 @@
     private class SortActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return InMemorySortOperatorDescriptor.this;
+        public SortActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
@@ -110,9 +109,8 @@
     private class MergeActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return InMemorySortOperatorDescriptor.this;
+        public MergeActivity(ActivityId id) {
+            super(id);
         }
 
         @Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 71c4a24..dde223b 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -20,7 +20,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -32,8 +32,8 @@
             IHyracksRootContext rootCtx = new TestRootContext(frameSize);
             INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx);
             IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, UUID.randomUUID());
-            TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityNodeId(new OperatorDescriptorId(
-                    UUID.randomUUID()), UUID.randomUUID()), 0), 0);
+            TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(
+                    new OperatorDescriptorId(UUID.randomUUID()), 0), 0), 0);
             IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
             return taskCtx;
         } catch (HyracksException e) {