Cleaned up JobActivityGraph

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@1635 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index e978ade..5aaf196 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -119,7 +119,7 @@
                     RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
                             : null;
                     RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
-                            AlgebricksMetaOperatorDescriptor.this.getOperatorId(), 0);
+                            AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
                     PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     try {
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
index 56870b2..bbda23c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -15,7 +15,7 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 public interface IActivityGraphBuilder {
-    public void addActivity(IActivity task);
+    public void addActivity(IOperatorDescriptor op, IActivity task);
 
     public void addBlockingEdge(IActivity blocker, IActivity blocked);
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
index 96d43a7..c480446 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -14,10 +14,10 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.value;
 
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 
 public interface IRecordDescriptorProvider {
-    public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex);
+    public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex);
 
-    public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex);
+    public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex);
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 2b6d361..3c6a547 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.api.job;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -31,8 +30,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class JobActivityGraph implements Serializable {
@@ -40,54 +38,63 @@
 
     private final String appName;
 
-    private final JobSpecification jobSpec;
-
     private final EnumSet<JobFlag> jobFlags;
 
-    private final Map<ActivityId, IActivity> activityNodes;
+    private final Map<ActivityId, IActivity> activityMap;
+
+    private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
+
+    private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+    private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
 
     private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
 
     private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
 
-    private final Map<OperatorDescriptorId, Set<ActivityId>> operatorActivityMap;
+    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
 
-    private final Map<ActivityId, List<Integer>> activityInputMap;
+    private int maxReattempts;
 
-    private final Map<ActivityId, List<Integer>> activityOutputMap;
+    private IJobletEventListenerFactory jobletEventListenerFactory;
 
-    private final Map<OperatorDescriptorId, List<ActivityId>> operatorInputMap;
+    private IGlobalJobDataFactory globalJobDataFactory;
 
-    private final Map<OperatorDescriptorId, List<ActivityId>> operatorOutputMap;
-
-    public JobActivityGraph(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+    public JobActivityGraph(String appName, EnumSet<JobFlag> jobFlags) {
         this.appName = appName;
-        this.jobSpec = jobSpec;
         this.jobFlags = jobFlags;
-        activityNodes = new HashMap<ActivityId, IActivity>();
+        activityMap = new HashMap<ActivityId, IActivity>();
+        connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+        connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+        activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
         blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
         blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
-        operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityId>>();
-        activityInputMap = new HashMap<ActivityId, List<Integer>>();
-        activityOutputMap = new HashMap<ActivityId, List<Integer>>();
-        operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
-        operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
     }
 
     public String getApplicationName() {
         return appName;
     }
 
-    public JobSpecification getJobSpecification() {
-        return jobSpec;
-    }
-
     public EnumSet<JobFlag> getJobFlags() {
         return jobFlags;
     }
 
-    public Map<ActivityId, IActivity> getActivityNodeMap() {
-        return activityNodes;
+    public Map<ActivityId, IActivity> getActivityMap() {
+        return activityMap;
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+        return connectorMap;
+    }
+
+    public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+        return connectorRecordDescriptorMap;
     }
 
     public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
@@ -98,106 +105,64 @@
         return blocked2blockerMap;
     }
 
-    public Map<OperatorDescriptorId, Set<ActivityId>> getOperatorActivityMap() {
-        return operatorActivityMap;
-    }
-
-    public Map<ActivityId, List<Integer>> getActivityInputMap() {
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
         return activityInputMap;
     }
 
-    public Map<ActivityId, List<Integer>> getActivityOutputMap() {
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
         return activityOutputMap;
     }
 
-    public Map<OperatorDescriptorId, List<ActivityId>> getOperatorInputMap() {
-        return operatorInputMap;
-    }
-
-    public Map<OperatorDescriptorId, List<ActivityId>> getOperatorOutputMap() {
-        return operatorOutputMap;
-    }
-
-    public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityId hanId) {
-        List<Integer> inputIndexes = activityInputMap.get(hanId);
-        if (inputIndexes == null) {
-            return null;
-        }
-        OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
-        List<IConnectorDescriptor> inputs = new ArrayList<IConnectorDescriptor>();
-        for (Integer i : inputIndexes) {
-            inputs.add(jobSpec.getInputConnectorDescriptor(ownerId, i));
-        }
-        return inputs;
-    }
-
-    public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityId hanId) {
-        List<Integer> outputIndexes = activityOutputMap.get(hanId);
-        if (outputIndexes == null) {
-            return null;
-        }
-        OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
-        List<IConnectorDescriptor> outputs = new ArrayList<IConnectorDescriptor>();
-        for (Integer i : outputIndexes) {
-            outputs.add(jobSpec.getOutputConnectorDescriptor(ownerId, i));
-        }
-        return outputs;
+    public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+        return connectorActivityMap;
     }
 
     public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
-                .getConnectorOperatorMap().get(cdId);
-
-        OperatorDescriptorId consumerOpId = connEdge.getRight().getLeft().getOperatorId();
-        int consumerInputIdx = connEdge.getRight().getRight();
-
-        for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
-            List<Integer> anInputs = activityInputMap.get(anId);
-            if (anInputs != null) {
-                for (Integer idx : anInputs) {
-                    if (idx.intValue() == consumerInputIdx) {
-                        return anId;
-                    }
-                }
-            }
-        }
-        return null;
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getRight().getLeft().getActivityId();
     }
 
     public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
-                .getConnectorOperatorMap().get(cdId);
-
-        OperatorDescriptorId producerOpId = connEdge.getLeft().getLeft().getOperatorId();
-        int producerInputIdx = connEdge.getLeft().getRight();
-
-        for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
-            List<Integer> anOutputs = activityOutputMap.get(anId);
-            if (anOutputs != null) {
-                for (Integer idx : anOutputs) {
-                    if (idx.intValue() == producerInputIdx) {
-                        return anId;
-                    }
-                }
-            }
-        }
-        return null;
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getLeft().getLeft().getActivityId();
     }
 
-    public RecordDescriptor getActivityInputRecordDescriptor(ActivityId hanId, int inputIndex) {
-        int opInputIndex = getActivityInputMap().get(hanId).get(inputIndex);
-        return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return connectorPolicyAssignmentPolicy;
     }
 
-    public RecordDescriptor getActivityOutputRecordDescriptor(ActivityId hanId, int outputIndex) {
-        int opOutputIndex = getActivityOutputMap().get(hanId).get(outputIndex);
-        return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+    }
+
+    public void setMaxReattempts(int maxReattempts) {
+        this.maxReattempts = maxReattempts;
+    }
+
+    public int getMaxReattempts() {
+        return maxReattempts;
+    }
+
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
+    public IGlobalJobDataFactory getGlobalJobDataFactory() {
+        return globalJobDataFactory;
+    }
+
+    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+        this.globalJobDataFactory = globalJobDataFactory;
     }
 
     @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();
-        buffer.append("ActivityNodes: " + activityNodes);
+        buffer.append("ActivityNodes: " + activityMap);
         buffer.append('\n');
         buffer.append("Blocker->Blocked: " + blocker2blockedMap);
         buffer.append('\n');
@@ -212,13 +177,12 @@
         jplan.put("flags", jobFlags.toString());
 
         JSONArray jans = new JSONArray();
-        for (IActivity an : activityNodes.values()) {
+        for (IActivity an : activityMap.values()) {
             JSONObject jan = new JSONObject();
             jan.put("id", an.getActivityId().toString());
             jan.put("java-class", an.getClass().getName());
-            jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
 
-            List<IConnectorDescriptor> inputs = getActivityInputConnectorDescriptors(an.getActivityId());
+            List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
             if (inputs != null) {
                 JSONArray jInputs = new JSONArray();
                 for (int i = 0; i < inputs.size(); ++i) {
@@ -230,7 +194,7 @@
                 jan.put("inputs", jInputs);
             }
 
-            List<IConnectorDescriptor> outputs = getActivityOutputConnectorDescriptors(an.getActivityId());
+            List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
             if (outputs != null) {
                 JSONArray jOutputs = new JSONArray();
                 for (int i = 0; i < outputs.size(); ++i) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index 1d96a71..a486c33 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -35,7 +35,6 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork;
 
 public class JobDetailsPage extends AbstractPage {
     private static final long serialVersionUID = 1L;
@@ -49,12 +48,6 @@
 
         JobId jobId = JobId.parse(jobIdStr.toString());
 
-        GetJobSpecificationJSONWork gjsw = new GetJobSpecificationJSONWork(ccs, jobId);
-        ccs.getWorkQueue().scheduleAndSync(gjsw);
-        Label jobspec = new Label("job-specification", gjsw.getJSON().toString());
-        jobspec.setEscapeModelStrings(false);
-        add(jobspec);
-
         GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId);
         ccs.getWorkQueue().scheduleAndSync(gjagw);
         Label jag = new Label("job-activity-graph", gjagw.getJSON().toString());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index f5b74cf..d351d4a 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -71,4 +71,9 @@
     public void setPlan(ActivityClusterPlan acp) {
         this.acp = acp;
     }
+
+    @Override
+    public String toString() {
+        return String.valueOf(activities);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
index 6effd84..a7b60e4 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
@@ -2,6 +2,7 @@
 
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -9,9 +10,14 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -19,7 +25,32 @@
 public class JobActivityGraphBuilder implements IActivityGraphBuilder {
     private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
 
-    private JobActivityGraph jag;
+    private final Map<ActivityId, IOperatorDescriptor> activityOperatorMap;
+
+    private final JobActivityGraph jag;
+
+    private final JobSpecification jobSpec;
+
+    private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorProducerMap;
+
+    private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
+
+    public JobActivityGraphBuilder(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+        activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
+        jag = new JobActivityGraph(appName, jobFlags);
+        this.jobSpec = jobSpec;
+        jag.setConnectorPolicyAssignmentPolicy(jobSpec.getConnectorPolicyAssignmentPolicy());
+        jag.setGlobalJobDataFactory(jobSpec.getGlobalJobDataFactory());
+        jag.setJobletEventListenerFactory(jobSpec.getJobletEventListenerFactory());
+        jag.setMaxReattempts(jobSpec.getMaxReattempts());
+        connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+        connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+    }
+
+    public void addConnector(IConnectorDescriptor conn) {
+        jag.getConnectorMap().put(conn.getConnectorId(), conn);
+        jag.getConnectorRecordDescriptorMap().put(conn.getConnectorId(), jobSpec.getConnectorRecordDescriptor(conn));
+    }
 
     @Override
     public void addBlockingEdge(IActivity blocker, IActivity blocked) {
@@ -33,9 +64,10 @@
             LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
                     + operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
         }
-        insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, operatorInputIndex);
-        insertIntoIndexedMap(jag.getOperatorInputMap(), task.getActivityId().getOperatorDescriptorId(),
-                operatorInputIndex, task.getActivityId());
+        IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+        IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
+        insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, conn);
+        connectorConsumerMap.put(conn.getConnectorId(), Pair.of(task, taskInputIndex));
     }
 
     @Override
@@ -44,16 +76,28 @@
             LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
                     + operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
         }
-        insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, operatorOutputIndex);
-        insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getActivityId().getOperatorDescriptorId(),
-                operatorOutputIndex, task.getActivityId());
+        IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+        IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
+        insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, conn);
+        connectorProducerMap.put(conn.getConnectorId(), Pair.of(task, taskOutputIndex));
     }
 
     @Override
-    public void addActivity(IActivity task) {
+    public void addActivity(IOperatorDescriptor op, IActivity task) {
+        activityOperatorMap.put(task.getActivityId(), op);
         ActivityId activityId = task.getActivityId();
-        jag.getActivityNodeMap().put(activityId, task);
-        addToValueSet(jag.getOperatorActivityMap(), activityId.getOperatorDescriptorId(), activityId);
+        jag.getActivityMap().put(activityId, task);
+    }
+
+    public void finish() {
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> caMap = jag
+                .getConnectorActivityMap();
+        for (Map.Entry<ConnectorDescriptorId, Pair<IActivity, Integer>> e : connectorProducerMap.entrySet()) {
+            ConnectorDescriptorId cdId = e.getKey();
+            Pair<IActivity, Integer> producer = e.getValue();
+            Pair<IActivity, Integer> consumer = connectorConsumerMap.get(cdId);
+            caMap.put(cdId, Pair.of(producer, consumer));
+        }
     }
 
     private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
@@ -72,10 +116,6 @@
         }
     }
 
-    public void init(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
-        jag = new JobActivityGraph(appName, jobSpec, jobFlags);
-    }
-
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
index 9d6aa71..9acc4b8 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -25,11 +25,8 @@
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -43,33 +40,22 @@
         this.jobRun = jobRun;
     }
 
-    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
-            Set<ActivityCluster> eqSets) {
-        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
+    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<ActivityCluster> eqSets) {
         for (ActivityCluster eqSet : eqSets) {
             for (ActivityId t : eqSet.getActivities()) {
-                IActivity activity = activityNodeMap.get(t);
-                List<Integer> inputList = jag.getActivityInputMap().get(t);
+                List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
                 if (inputList != null) {
-                    for (Integer idx : inputList) {
-                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
-                                .getOperatorDescriptorId(), idx);
-                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
-                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
-                        ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+                    for (IConnectorDescriptor conn : inputList) {
+                        ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
                         if (!eqSet.getActivities().contains(inTask)) {
                             return Pair.<ActivityId, ActivityId> of(t, inTask);
                         }
                     }
                 }
-                List<Integer> outputList = jag.getActivityOutputMap().get(t);
+                List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
                 if (outputList != null) {
-                    for (Integer idx : outputList) {
-                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
-                                .getOperatorDescriptorId(), idx);
-                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
-                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
-                        ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+                    for (IConnectorDescriptor conn : outputList) {
+                        ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
                         if (!eqSet.getActivities().contains(outTask)) {
                             return Pair.<ActivityId, ActivityId> of(t, outTask);
                         }
@@ -81,27 +67,23 @@
     }
 
     public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
-        JobSpecification spec = jag.getJobSpecification();
-
         /*
          * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
          */
         Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
         Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
-        for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
-            for (ActivityId taskId : taskIds) {
-                Set<ActivityId> eqSet = new HashSet<ActivityId>();
-                eqSet.add(taskId);
-                ActivityCluster stage = new ActivityCluster(eqSet);
-                stageMap.put(taskId, stage);
-                stages.add(stage);
-            }
+        for (ActivityId taskId : jag.getActivityMap().keySet()) {
+            Set<ActivityId> eqSet = new HashSet<ActivityId>();
+            eqSet.add(taskId);
+            ActivityCluster stage = new ActivityCluster(eqSet);
+            stageMap.put(taskId, stage);
+            stages.add(stage);
         }
 
         boolean changed = true;
         while (changed) {
             changed = false;
-            Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
+            Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
             if (pair != null) {
                 merge(stageMap, stages, pair.getLeft(), pair.getRight());
                 changed = true;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index d04bf81..23f53f4 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -184,7 +184,7 @@
         for (ActivityId ac1 : activities) {
             Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
             int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
@@ -334,7 +334,7 @@
         for (ActivityId a1 : activities) {
             Task[] ac1TaskStates = taskMap.get(a1).getTasks();
             int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(a1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
@@ -356,7 +356,7 @@
     }
 
     private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
-        IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph().getJobSpecification()
+        IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph()
                 .getConnectorPolicyAssignmentPolicy();
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
@@ -368,6 +368,7 @@
             throws HyracksException {
         PartitionConstraintSolver solver = scheduler.getSolver();
         JobRun jobRun = scheduler.getJobRun();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
         Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
         for (ActivityId anId : ac.getActivities()) {
             lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
@@ -393,22 +394,25 @@
         for (ActivityId anId : ac.getActivities()) {
             int nParts = nPartMap.get(anId.getOperatorDescriptorId());
             int[] nInputPartitions = null;
-            List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+            List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(anId);
             if (inputs != null) {
                 nInputPartitions = new int[inputs.size()];
                 for (int i = 0; i < nInputPartitions.length; ++i) {
-                    nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
-                            .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                    ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
+                    ActivityId aid = jag.getProducerActivity(cdId);
+                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+                    nInputPartitions[i] = nPartInt;
                 }
             }
             int[] nOutputPartitions = null;
-            List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
-                    anId);
+            List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(anId);
             if (outputs != null) {
                 nOutputPartitions = new int[outputs.size()];
                 for (int i = 0; i < nOutputPartitions.length; ++i) {
-                    nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
-                            .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+                    ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
+                    ActivityId aid = jag.getConsumerActivity(cdId);
+                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+                    nOutputPartitions[i] = nPartInt;
                 }
             }
             ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 4a315ab..b3a76ad 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -25,32 +26,24 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
@@ -75,12 +68,15 @@
 
     private Set<ActivityCluster> rootActivityClusters;
 
-    public JobScheduler(ClusterControllerService ccs, JobRun jobRun) {
+    public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
         this.ccs = ccs;
         this.jobRun = jobRun;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
         inProgressTaskClusters = new HashSet<TaskCluster>();
+        solver.addConstraints(constraints);
+        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
+        rootActivityClusters = acgb.inferActivityClusters(jobRun.getJobActivityGraph());
     }
 
     public JobRun getJobRun() {
@@ -92,43 +88,9 @@
     }
 
     public void startJob() throws HyracksException {
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Starting Job: " + jobRun.getJobActivityGraph().getJobSpecification());
-        }
-        analyze();
         startRunnableActivityClusters();
     }
 
-    private void analyze() throws HyracksException {
-        final JobActivityGraph jag = jobRun.getJobActivityGraph();
-        final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
-        JobSpecification spec = jag.getJobSpecification();
-        final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
-        final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
-            @Override
-            public void addConstraint(Constraint constraint) {
-                contributedConstraints.add(constraint);
-            }
-        };
-        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) {
-                op.contributeSchedulingConstraints(acceptor, jag, appCtx);
-            }
-        });
-        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
-            @Override
-            public void visit(IConnectorDescriptor conn) {
-                conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
-            }
-        });
-        contributedConstraints.addAll(spec.getUserConstraints());
-        solver.addConstraints(contributedConstraints);
-
-        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
-        rootActivityClusters = acgb.inferActivityClusters(jag);
-    }
-
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
             throws HyracksException {
         for (ActivityCluster root : roots) {
@@ -644,7 +606,7 @@
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
                 lastAttempt.setEndTime(System.currentTimeMillis());
                 abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getJobSpecification().getMaxReattempts()) {
+                if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getMaxReattempts()) {
                     abortJob(new HyracksException(details));
                     return;
                 }
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
index 4135b18..c964e4e 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
 import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
 
 public class JobsRESTAPIFunction implements IJSONOutputFunction {
@@ -49,11 +48,7 @@
             case 2: {
                 JobId jobId = JobId.parse(arguments[0]);
 
-                if ("job-specification".equalsIgnoreCase(arguments[1])) {
-                    GetJobSpecificationJSONWork gjse = new GetJobSpecificationJSONWork(ccs, jobId);
-                    ccs.getWorkQueue().scheduleAndSync(gjse);
-                    result.put("result", gjse.getJSON());
-                } else if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
+                if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
                     GetJobActivityGraphJSONWork gjage = new GetJobActivityGraphJSONWork(ccs, jobId);
                     ccs.getWorkQueue().scheduleAndSync(gjage);
                     result.put("result", gjage.getJSON());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
deleted file mode 100644
index 05b7cae..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetJobSpecificationJSONWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private JSONObject json;
-
-    public GetJobSpecificationJSONWork(ClusterControllerService ccs, JobId jobId) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-    }
-
-    @Override
-    protected void doRun() throws Exception {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-            if (run == null) {
-                json = new JSONObject();
-                return;
-            }
-        }
-        json = run.getJobActivityGraph().getJobSpecification().toJSON();
-    }
-
-    public JSONObject getJSON() {
-        return json;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 07088bb..8d8e85d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -17,10 +17,16 @@
 import java.util.Set;
 import java.util.logging.Logger;
 
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
@@ -53,13 +59,44 @@
         Set<String> targetNodes = run.getParticipatingNodeIds();
         run.getCleanupPendingNodeIds().addAll(targetNodes);
         run.setPendingStatus(status, exception);
-        for (String n : targetNodes) {
-            NodeControllerState ncs = ccs.getNodeMap().get(n);
+        if (targetNodes != null && !targetNodes.isEmpty()) {
+            for (String n : targetNodes) {
+                NodeControllerState ncs = ccs.getNodeMap().get(n);
+                try {
+                    ncs.getNodeController().cleanUpJoblet(jobId, status);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+            if (appCtx != null) {
+                try {
+                    appCtx.notifyJobFinish(jobId);
+                } catch (HyracksException e) {
+                    e.printStackTrace();
+                }
+            }
+            run.setStatus(run.getPendingStatus(), run.getPendingException());
+            ccs.getActiveRunMap().remove(jobId);
+            ccs.getRunMapArchive().put(jobId, run);
             try {
-                ncs.getNodeController().cleanUpJoblet(jobId, status);
+                ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
-                e.printStackTrace();
+                throw new RuntimeException(e);
             }
         }
     }
+
+    private JSONObject createJobLogObject(final JobRun run) {
+        JSONObject jobLogObject = new JSONObject();
+        try {
+            JobActivityGraph jag = run.getJobActivityGraph();
+            jobLogObject.put("job-activity-graph", jag.toJSON());
+            jobLogObject.put("job-run", run.toJSON());
+        } catch (JSONException e) {
+            throw new RuntimeException(e);
+        }
+        return jobLogObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
index b7cf629..ea8cef3 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
@@ -15,7 +15,12 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
 
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -25,6 +30,7 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -54,20 +60,26 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+            final CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
             if (appCtx == null) {
                 throw new HyracksException("No application with id " + appName + " found");
             }
             JobSpecification spec = appCtx.createJobSpecification(jobSpec);
 
-            final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
-            builder.init(appName, spec, jobFlags);
+            final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(appName, spec, jobFlags);
+            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+                @Override
+                public void visit(IConnectorDescriptor conn) throws HyracksException {
+                    builder.addConnector(conn);
+                }
+            });
             PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
                 @Override
                 public void visit(IOperatorDescriptor op) {
                     op.contributeActivities(builder);
                 }
             });
+            builder.finish();
             final JobActivityGraph jag = builder.getActivityGraph();
 
             JobRun run = new JobRun(jobId, jag);
@@ -75,7 +87,28 @@
             run.setStatus(JobStatus.INITIALIZED, null);
 
             ccs.getActiveRunMap().put(jobId, run);
-            JobScheduler jrs = new JobScheduler(ccs, run);
+            final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
+            final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+                @Override
+                public void addConstraint(Constraint constraint) {
+                    contributedConstraints.add(constraint);
+                }
+            };
+            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+                @Override
+                public void visit(IOperatorDescriptor op) {
+                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
+                }
+            });
+            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+                @Override
+                public void visit(IConnectorDescriptor conn) {
+                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
+                }
+            });
+            contributedConstraints.addAll(spec.getUserConstraints());
+
+            JobScheduler jrs = new JobScheduler(ccs, run, contributedConstraints);
             run.setScheduler(jrs);
             appCtx.notifyJobCreation(jobId, spec);
             callback.setValue(jobId);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 80ab5ec..dea4a8d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -83,7 +83,6 @@
         JSONObject jobLogObject = new JSONObject();
         try {
             JobActivityGraph jag = run.getJobActivityGraph();
-            jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
             jobLogObject.put("job-activity-graph", jag.toJSON());
             jobLogObject.put("job-run", run.toJSON());
         } catch (JSONException e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index ec7cf89..10c9c3d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -97,7 +97,7 @@
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
         cleanupPending = false;
-        IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+        IJobletEventListenerFactory jelf = jag.getJobletEventListenerFactory();
         if (jelf != null) {
             IJobletEventListener listener = jelf.createListener(this);
             this.jobletEventListener = listener;
@@ -105,7 +105,7 @@
         } else {
             jobletEventListener = null;
         }
-        IGlobalJobDataFactory gjdf = jag.getJobSpecification().getGlobalJobDataFactory();
+        IGlobalJobDataFactory gjdf = jag.getGlobalJobDataFactory();
         globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
     }
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 78101bf..8d1b2a6 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -26,11 +26,11 @@
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -90,20 +90,22 @@
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
-                public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
-                    return jag.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+                public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+                    IConnectorDescriptor conn = jag.getActivityOutputMap().get(aid).get(outputIndex);
+                    return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
 
                 @Override
-                public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
-                    return jag.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+                public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+                    IConnectorDescriptor conn = jag.getActivityInputMap().get(aid).get(inputIndex);
+                    return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
             };
 
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
-                IActivity han = jag.getActivityNodeMap().get(tid.getActivityId());
+                IActivity han = jag.getActivityMap().get(tid.getActivityId());
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
@@ -113,7 +115,7 @@
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
 
-                List<IConnectorDescriptor> inputs = jag.getActivityInputConnectorDescriptors(tid.getActivityId());
+                List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(tid.getActivityId());
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -121,17 +123,17 @@
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
-                        RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+                        RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
                                 recordDesc, cPolicy);
                         collectors.add(collector);
                     }
                 }
-                List<IConnectorDescriptor> outputs = jag.getActivityOutputConnectorDescriptors(tid.getActivityId());
+                List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(tid.getActivityId());
                 if (outputs != null) {
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
-                        RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+                        RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
                         IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c4156f4..34f3e9d 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -230,9 +230,7 @@
                             data[1] = value;
                             writer.writeData(data);
                         }
-                    };
-                    ;
-                    ;
+                    };;;
 
                     OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
                             .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -254,9 +252,7 @@
                         public Counter getCounter(Enum<?> arg0) {
                             return null;
                         }
-                    };
-                    ;
-                    ;
+                    };;;
                     context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
                             newReader, recordWriter, outputCommitter, statusReporter,
                             (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -308,8 +304,8 @@
         inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
     }
 
-    public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
-            throws IOException {
+    public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf,
+            IHadoopClassFactory hadoopClassFactory) throws IOException {
         super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
     }
 
@@ -412,7 +408,7 @@
                 return createSelfReadingMapper(ctx, recordDescriptor, partition);
             } else {
                 return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
-                        recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+                        recordDescProvider.getInputRecordDescriptor(this.activityNodeId, 0));
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 35df8de..86a4c3c 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -311,8 +311,8 @@
     private IComparatorFactory comparatorFactory;
     private boolean useAsCombiner = false;
 
-    public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf, IComparatorFactory comparatorFactory,
-            IHadoopClassFactory classFactory, boolean useAsCombiner) {
+    public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf,
+            IComparatorFactory comparatorFactory, IHadoopClassFactory classFactory, boolean useAsCombiner) {
         super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
         this.useAsCombiner = useAsCombiner;
@@ -371,7 +371,7 @@
             IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 },
                     new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
             return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
-                    getOperatorId(), 0));
+                    getActivityId(), 0));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
index dd9e6ad..9048670 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
@@ -36,7 +36,7 @@
 
     @Override
     public final void contributeActivities(IActivityGraphBuilder builder) {
-        builder.addActivity(this);
+        builder.addActivity(this, this);
         for (int i = 0; i < getInputArity(); ++i) {
             builder.addSourceEdge(i, this, i);
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 1d6b69d..a69fd01 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -24,12 +24,11 @@
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
 import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
@@ -63,12 +62,11 @@
     @Override
     public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
             ICCApplicationContext appCtx) {
-        JobSpecification jobSpec = plan.getJobSpecification();
-        IOperatorDescriptor consumer = jobSpec.getConsumer(this);
-        IOperatorDescriptor producer = jobSpec.getProducer(this);
+        OperatorDescriptorId consumer = plan.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
+        OperatorDescriptorId producer = plan.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
-        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer.getOperatorId()),
-                new PartitionCountExpression(producer.getOperatorId())));
+        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
+                new PartitionCountExpression(producer)));
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 584d398..e05d80d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -91,6 +91,6 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 2a205cc..6fb0189 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -50,7 +50,8 @@
      * @param inputArity
      * @param outputArity
      */
-    public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider, String delim) {
+    public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider,
+            String delim) {
         super(spec, 1, 0);
         this.fileSplitProvider = fileSplitProvider;
         this.delim = delim;
@@ -74,9 +75,9 @@
         final FileSplit[] splits = fileSplitProvider.getFileSplits();
         // Frame accessor
         final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         // Record descriptor
-        final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+        final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private BufferedWriter out;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 9e64ebb..9fd45e9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -91,10 +91,10 @@
         AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
         MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
-        builder.addActivity(aggregateAct);
+        builder.addActivity(this, aggregateAct);
         builder.addSourceEdge(0, aggregateAct, 0);
 
-        builder.addActivity(mergeAct);
+        builder.addActivity(this, mergeAct);
         builder.addTargetEdge(0, mergeAct, 0);
 
         builder.addBlockingEdge(aggregateAct, mergeAct);
@@ -113,7 +113,7 @@
                 throws HyracksDataException {
             return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keyFields,
                     framesLimit, comparatorFactories, firstNormalizerFactory, aggregatorFactory,
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                    recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0],
                     spillableTableFactory);
         }
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
index 91f80a7..034e13f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
@@ -47,9 +47,9 @@
 
     private final int tableSize;
 
-    public HashGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys, ITuplePartitionComputerFactory tpcf,
-            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor outRecordDescriptor, int tableSize) {
+    public HashGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys,
+            ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDescriptor, int tableSize) {
         super(spec, 1, 1);
         this.keys = keys;
         this.tpcf = tpcf;
@@ -69,10 +69,10 @@
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
         HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
-        builder.addActivity(ha);
+        builder.addActivity(this, ha);
 
         OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
-        builder.addActivity(oa);
+        builder.addActivity(this, oa);
 
         builder.addSourceEdge(0, ha, 0);
         builder.addTargetEdge(0, oa, 0);
@@ -91,7 +91,7 @@
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new HashGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keys, tpcf,
                     comparatorFactories, aggregatorFactory, tableSize, recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0), recordDescriptors[0]);
+                            getActivityId(), 0), recordDescriptors[0]);
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 9df5e44..d37b818 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group.preclustered;
 
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -47,6 +46,6 @@
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
             throws HyracksDataException {
         return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0]);
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index ef5cb94..6c58d6f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -45,9 +45,10 @@
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
-    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -62,10 +63,10 @@
         recordDescriptors[0] = recordDescriptor;
     }
 
-    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            INullWriterFactory[] nullWriterFactories1) {
+    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -82,19 +83,19 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, RPARTITION_ACTIVITY_ID),
-                keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, SPARTITION_ACTIVITY_ID),
-                keys1, 1);
-        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID));
+        ActivityId rpartAid = new ActivityId(odId, RPARTITION_ACTIVITY_ID);
+        HashPartitionActivityNode rpart = new HashPartitionActivityNode(rpartAid, keys0);
+        ActivityId spartAid = new ActivityId(odId, SPARTITION_ACTIVITY_ID);
+        HashPartitionActivityNode spart = new HashPartitionActivityNode(spartAid, keys1);
+        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID), rpartAid, spartAid);
 
-        builder.addActivity(rpart);
+        builder.addActivity(this, rpart);
         builder.addSourceEdge(0, rpart, 0);
 
-        builder.addActivity(spart);
+        builder.addActivity(this, spart);
         builder.addSourceEdge(1, spart, 0);
 
-        builder.addActivity(join);
+        builder.addActivity(this, join);
         builder.addBlockingEdge(rpart, spart);
         builder.addBlockingEdge(spart, join);
 
@@ -107,13 +108,11 @@
 
     private class HashPartitionActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private int operatorInputIndex;
         private int keys[];
 
-        public HashPartitionActivityNode(ActivityId id, int keys[], int operatorInputIndex) {
+        public HashPartitionActivityNode(ActivityId id, int keys[]) {
             super(id);
             this.keys = keys;
-            this.operatorInputIndex = operatorInputIndex;
         }
 
         @Override
@@ -121,23 +120,28 @@
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
             return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition),
                     keys, hashFunctionFactories, comparatorFactories, (int) Math.ceil(Math.sqrt(inputsize0 * factor
-                            / nPartitions)), recordDescProvider.getInputRecordDescriptor(getOperatorId(),
-                            operatorInputIndex));
+                            / nPartitions)), recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         }
     }
 
     private class JoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public JoinActivityNode(ActivityId id) {
+        private final ActivityId rpartAid;
+
+        private final ActivityId spartAid;
+
+        public JoinActivityNode(ActivityId id, ActivityId rpartAid, ActivityId spartAid) {
             super(id);
+            this.rpartAid = rpartAid;
+            this.spartAid = spartAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
             int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
             return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1eaf3bf..4f9b987 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -84,10 +84,10 @@
      * @param recordDescriptor
      * @throws HyracksDataException
      */
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
-            throws HyracksDataException {
+    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -102,10 +102,11 @@
         recordDescriptors[0] = recordDescriptor;
     }
 
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
+            throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -122,15 +123,15 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId,
-                BUILD_AND_PARTITION_ACTIVITY_ID));
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId,
-                PARTITION_AND_JOIN_ACTIVITY_ID));
+        ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+        ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p2Aid, p1Aid);
 
-        builder.addActivity(phase1);
+        builder.addActivity(this, phase1);
         builder.addSourceEdge(1, phase1, 0);
 
-        builder.addActivity(phase2);
+        builder.addActivity(this, phase2);
         builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
@@ -166,15 +167,18 @@
     private class BuildAndPartitionActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public BuildAndPartitionActivityNode(ActivityId id) {
+        private final ActivityId joinAid;
+
+        public BuildAndPartitionActivityNode(ActivityId id, ActivityId joinAid) {
             super(id);
+            this.joinAid = joinAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -351,15 +355,18 @@
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public PartitionAndJoinActivityNode(ActivityId id) {
+        private final ActivityId buildAid;
+
+        public PartitionAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
             super(id);
+            this.buildAid = buildAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ddbe417..e0a5613 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -48,9 +48,6 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final int BUILD_ACTIVITY_ID = 0;
-    private static final int PROBE_ACTIVITY_ID = 1;
-
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
     private final int[] keys1;
@@ -91,13 +88,15 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivityNode hba = new HashBuildActivityNode(new ActivityId(odId, 0));
-        HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
+        ActivityId hbaId = new ActivityId(odId, 0);
+        ActivityId hpaId = new ActivityId(odId, 1);
+        HashBuildActivityNode hba = new HashBuildActivityNode(hbaId, hpaId);
+        HashProbeActivityNode hpa = new HashProbeActivityNode(hpaId);
 
-        builder.addActivity(hba);
+        builder.addActivity(this, hba);
         builder.addSourceEdge(1, hba, 0);
 
-        builder.addActivity(hpa);
+        builder.addActivity(this, hpa);
         builder.addSourceEdge(0, hpa, 0);
 
         builder.addTargetEdge(0, hpa, 0);
@@ -129,17 +128,18 @@
     private class HashBuildActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public HashBuildActivityNode(ActivityId id) {
+        private final ActivityId hpaId;
+
+        public HashBuildActivityNode(ActivityId id, ActivityId hpaId) {
             super(id);
+            this.hpaId = hpaId;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider
-                    .getInputRecordDescriptor(getOperatorId(), BUILD_ACTIVITY_ID);
-            final RecordDescriptor rd1 = recordDescProvider
-                    .getInputRecordDescriptor(getOperatorId(), PROBE_ACTIVITY_ID);
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -204,8 +204,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            BUILD_ACTIVITY_ID), partition));
+                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0),
+                            partition));
                     writer.open();
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 8d4f57a..a699703 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -58,14 +58,15 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID));
-        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(),
-                NL_JOIN_ACTIVITY_ID));
+        ActivityId jcaId = new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID);
+        ActivityId nljAid = new ActivityId(getOperatorId(), NL_JOIN_ACTIVITY_ID);
+        JoinCacheActivityNode jc = new JoinCacheActivityNode(jcaId, nljAid);
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(nljAid);
 
-        builder.addActivity(jc);
+        builder.addActivity(this, jc);
         builder.addSourceEdge(1, jc, 0);
 
-        builder.addActivity(nlj);
+        builder.addActivity(this, nlj);
         builder.addSourceEdge(0, nlj, 0);
 
         builder.addTargetEdge(0, nlj, 0);
@@ -96,15 +97,18 @@
     private class JoinCacheActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public JoinCacheActivityNode(ActivityId id) {
+        private final ActivityId nljAid;
+
+        public JoinCacheActivityNode(ActivityId id, ActivityId nljAid) {
             super(id);
+            this.nljAid = nljAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index da02f3d..3a7ee2c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -93,7 +93,6 @@
  */
 
 public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-
     private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
     private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
 
@@ -162,15 +161,15 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
-                BUILD_AND_PARTITION_ACTIVITY_ID));
-        ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
-                PARTITION_AND_JOIN_ACTIVITY_ID));
+        ActivityId buildAid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+        ActivityId probeAid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+        PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(buildAid, probeAid);
+        ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
 
-        builder.addActivity(phase1);
+        builder.addActivity(this, phase1);
         builder.addSourceEdge(0, phase1, 0);
 
-        builder.addActivity(phase2);
+        builder.addActivity(this, phase2);
         builder.addSourceEdge(1, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
@@ -236,16 +235,19 @@
     private class PartitionAndBuildActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public PartitionAndBuildActivityNode(ActivityId id) {
+        private final ActivityId probeAid;
+
+        public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) {
             super(id);
+            this.probeAid = probeAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
-            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
 
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; i++) {
@@ -314,16 +316,19 @@
 
         private static final long serialVersionUID = 1L;
 
-        public ProbeAndJoinActivityNode(ActivityId id) {
+        private final ActivityId buildAid;
+
+        public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
             super(id);
+            this.buildAid = buildAid;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
-            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
+            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index 7de8f95..30aae7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -64,8 +64,8 @@
 
     private final IDeserializedMapperFactory mapperFactory;
 
-    public DeserializedMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, IDeserializedMapperFactory mapperFactory,
-            RecordDescriptor recordDescriptor) {
+    public DeserializedMapperOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IDeserializedMapperFactory mapperFactory, RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.mapperFactory = mapperFactory;
         recordDescriptors[0] = recordDescriptor;
@@ -75,6 +75,6 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index d38b0a4..ba8c00c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -54,10 +54,10 @@
         MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
         ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
 
-        builder.addActivity(ma);
+        builder.addActivity(this, ma);
         builder.addSourceEdge(0, ma, 0);
 
-        builder.addActivity(ra);
+        builder.addActivity(this, ra);
         builder.addTargetEdge(0, ra, 0);
 
         builder.addBlockingEdge(ma, ra);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 56a4f79..47a8616 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -64,6 +64,6 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index c8c8fb8..aad0e66 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -105,7 +105,7 @@
                 }
             };
             return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
-                    getOperatorId(), 0));
+                    getActivityId(), 0));
         }
     }
 
@@ -158,8 +158,8 @@
                     writer.fail();
                 }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
-                    getOperatorId(), 0));
+            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getOutputRecordDescriptor(
+                    getActivityId(), 0));
         }
     }
 
@@ -178,10 +178,10 @@
         CollectActivity ca = new CollectActivity(new ActivityId(odId, COLLECT_ACTIVITY_ID));
         SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
 
-        builder.addActivity(ca);
+        builder.addActivity(this, ca);
         builder.addSourceEdge(0, ca, 0);
 
-        builder.addActivity(sa);
+        builder.addActivity(this, sa);
         builder.addTargetEdge(0, sa, 0);
 
         builder.addBlockingEdge(ca, sa);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 708992b..be71b44 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -75,10 +75,10 @@
         SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
         MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
-        builder.addActivity(sa);
+        builder.addActivity(this, sa);
         builder.addSourceEdge(0, sa, 0);
 
-        builder.addActivity(ma);
+        builder.addActivity(this, ma);
         builder.addTargetEdge(0, ma, 0);
 
         builder.addBlockingEdge(sa, ma);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index be1e067..3b3c28d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -67,10 +67,10 @@
         SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
         MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
-        builder.addActivity(sa);
+        builder.addActivity(this, sa);
         builder.addSourceEdge(0, sa, 0);
 
-        builder.addActivity(ma);
+        builder.addActivity(this, ma);
         builder.addTargetEdge(0, ma, 0);
 
         builder.addBlockingEdge(sa, ma);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index 23169b5..d9c16d8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -98,10 +98,10 @@
         OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
         OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
-        builder.addActivity(osa);
+        builder.addActivity(this, osa);
         builder.addSourceEdge(0, osa, 0);
 
-        builder.addActivity(oma);
+        builder.addActivity(this, oma);
         builder.addTargetEdge(0, oma, 0);
 
         builder.addBlockingEdge(osa, oma);
@@ -191,8 +191,8 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), SORT_ACTIVITY_ID), partition));
+                    OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(
+                            new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
 
                     List<IFrameReader> runs = state.runs;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 6c93b61..7ac9f58 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -40,7 +40,7 @@
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
         UnionActivityNode uba = new UnionActivityNode(new ActivityId(getOperatorId(), 0));
-        builder.addActivity(uba);
+        builder.addActivity(this, uba);
         for (int i = 0; i < inputArity; ++i) {
             builder.addSourceEdge(i, uba, i);
         }
@@ -58,7 +58,7 @@
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
                 throws HyracksDataException {
-            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             return new UnionOperator(ctx, inRecordDesc);
         }
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index d556285..0d5a627 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -87,8 +87,7 @@
 
     /**
      * Test of aggregations using locality aware connector. The output two files should be the
-     * same, each of which is the aggregation of two copies of the lineitem.tbl. 
-     * 
+     * same, each of which is the aggregation of two copies of the lineitem.tbl.
      * Note that if the hashing connector is not working correctly, the two files may be different. This
      * also means that even the output files are the same, the hashing may have other problems.
      * 
@@ -99,66 +98,47 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+                "asterix-003", "asterix-004");
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                        .of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true),
-                                new FloatSumFieldAggregatorFactory(5, true) }),
-                outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+                        new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                "asterix-005", "asterix-006");
-        
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
+
         BitSet nodemap = new BitSet(8);
-        
+
         nodemap.set(0);
         nodemap.set(2);
         nodemap.set(5);
         nodemap.set(7);
 
-        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                        keyFields,
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
-                
-                new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "localityAwareSumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -166,11 +146,11 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     /**
      * Test for locality aware connector, using the global hashing node mapper. This should have
      * the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
-     *  
+     * 
      * @throws Exception
      */
     @Test
@@ -178,59 +158,40 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+                "asterix-003", "asterix-004");
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                        .of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true),
-                                new FloatSumFieldAggregatorFactory(5, true) }),
-                outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+                        new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
 
-        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                        keyFields,
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
-                
-                new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "localityAwareSumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -238,16 +199,14 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            IOperatorDescriptorRegistry spec, String prefix) throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-                spec, new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit("asterix-005", createTempFile()
-                                .getAbsolutePath()),
-                        new FileSplit("asterix-006", createTempFile()
-                                .getAbsolutePath()) }), "\t");
+    private AbstractSingleActivityOperatorDescriptor getPrinter(IOperatorDescriptorRegistry spec, String prefix)
+            throws IOException {
+
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit("asterix-005", createTempFile().getAbsolutePath()),
+                        new FileSplit("asterix-006", createTempFile().getAbsolutePath()) }), "\t");
 
         return printer;
     }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 9dda175..77a7a1e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -38,7 +38,7 @@
         super(opDesc, ctx, partition, recordDescProvider);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
-        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         if (lowKeyFields != null && lowKeyFields.length > 0) {
             lowKey = new PermutingFrameTupleReference();
             lowKey.setFieldPermutation(lowKeyFields);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index d9f3a5a..a2d78a4 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -49,7 +49,7 @@
     public void open() throws HyracksDataException {
         AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
                 .getOperatorDescriptor();
-        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
         try {
             treeIndexHelper.init(false);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index fb9c965..e05568f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -55,7 +55,7 @@
     public void open() throws HyracksDataException {
         AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
                 .getOperatorDescriptor();
-        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
         writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
         writer.open();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 0bd970a..a4c8e7a 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -54,7 +54,7 @@
             int partition, IRecordDescriptorProvider recordDescProvider) {
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
-        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
     }
 
     protected abstract ISearchPredicate createSearchPredicate();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 94409de..a146479 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -46,9 +46,9 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) throws HyracksDataException {
-        return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(odId, 0),
-                recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
+                getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
     }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 62daff2..c6fa56d 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -52,7 +52,7 @@
     public void open() throws HyracksDataException {
         AbstractInvertedIndexOperatorDescriptor opDesc = (AbstractInvertedIndexOperatorDescriptor) btreeDataflowHelper
                 .getOperatorDescriptor();
-        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(btreeDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
 
         // BTree.
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 8536a07..b9a43ea 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -71,7 +71,7 @@
 
     @Override
     public void open() throws HyracksDataException {
-        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(btreeDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
         tuple = new FrameTupleReference();
         // BTree.