merged hyracks_asterix_stabilization r1652:1654

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1658 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index d86bf0f..899b633 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -42,7 +42,8 @@
      */
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
-            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+            throws AlgebricksException;
 
     public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index db74c41..e34e60d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -23,6 +23,7 @@
 public class HyracksClientInterfaceFunctions {
     public enum FunctionId {
         GET_CLUSTER_CONTROLLER_INFO,
+        GET_CLUSTER_TOPOLOGY,
         CREATE_APPLICATION,
         START_APPLICATION,
         DESTROY_APPLICATION,
@@ -128,12 +129,12 @@
         private static final long serialVersionUID = 1L;
 
         private final String appName;
-        private final byte[] jobSpec;
+        private final byte[] acggfBytes;
         private final EnumSet<JobFlag> jobFlags;
 
-        public StartJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+        public StartJobFunction(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
             this.appName = appName;
-            this.jobSpec = jobSpec;
+            this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
         }
 
@@ -146,8 +147,8 @@
             return appName;
         }
 
-        public byte[] getJobSpec() {
-            return jobSpec;
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
         }
 
         public EnumSet<JobFlag> getJobFlags() {
@@ -182,4 +183,13 @@
             return FunctionId.GET_NODE_CONTROLLERS_INFO;
         }
     }
+
+    public static class GetClusterTopologyFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_TOPOLOGY;
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 7413951..4c06d42 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 
@@ -68,9 +69,9 @@
     }
 
     @Override
-    public JobId startJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
-                appName, jobSpec, jobFlags);
+                appName, acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -86,4 +87,10 @@
         HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
         return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
     }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws Exception {
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        return (ClusterTopology) rpci.call(ipcHandle, gctf);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 33d76ec..227524c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -25,11 +25,14 @@
 import org.apache.http.entity.FileEntity;
 import org.apache.http.impl.client.DefaultHttpClient;
 
+import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -100,12 +103,19 @@
 
     @Override
     public JobId startJob(String appName, JobSpecification jobSpec) throws Exception {
-        return hci.startJob(appName, JavaSerializationUtils.serialize(jobSpec), EnumSet.noneOf(JobFlag.class));
+        return startJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
     }
 
     @Override
     public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.startJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+                jobSpec);
+        return startJob(appName, jsacggf, jobFlags);
+    }
+
+    public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception {
+        return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
     @Override
@@ -117,4 +127,9 @@
     public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
         return hci.getNodeControllersInfo();
     }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws Exception {
+        return hci.getClusterTopology();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 72a87c2..bdbb544 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -18,10 +18,12 @@
 import java.util.EnumSet;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 
 /**
  * Interface used by clients to communicate with the Hyracks Cluster Controller.
@@ -84,6 +86,20 @@
     public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
+     * Start the specified Job.
+     * 
+     * @param appName
+     *            Name of the application
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
+
+    /**
      * Waits until the specified job has completed, either successfully or has
      * encountered a permanent failure.
      * 
@@ -99,4 +115,12 @@
      * @return Map of node name to node information.
      */
     public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+
+    /**
+     * Get the cluster topology
+     * 
+     * @return the cluster topology
+     * @throws Exception
+     */
+    public ClusterTopology getClusterTopology() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 43dee05..ef5906e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 
 public interface IHyracksClientInterface {
     public ClusterControllerInfo getClusterControllerInfo() throws Exception;
@@ -32,9 +33,11 @@
 
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
-    public JobId startJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+    public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
+
+    public ClusterTopology getClusterTopology() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..6c6c211
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONException;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class ActivityClusterGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+    public ActivityClusterGraphBuilder() {
+    }
+
+    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<Set<ActivityId>> eqSets) {
+        for (Set<ActivityId> eqSet : eqSets) {
+            for (ActivityId t : eqSet) {
+                List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
+                if (inputList != null) {
+                    for (IConnectorDescriptor conn : inputList) {
+                        ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
+                        if (!eqSet.contains(inTask)) {
+                            return Pair.<ActivityId, ActivityId> of(t, inTask);
+                        }
+                    }
+                }
+                List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
+                if (outputList != null) {
+                    for (IConnectorDescriptor conn : outputList) {
+                        ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
+                        if (!eqSet.contains(outTask)) {
+                            return Pair.<ActivityId, ActivityId> of(t, outTask);
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+        /*
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+         */
+        Map<ActivityId, Set<ActivityId>> stageMap = new HashMap<ActivityId, Set<ActivityId>>();
+        Set<Set<ActivityId>> stages = new HashSet<Set<ActivityId>>();
+        for (ActivityId taskId : jag.getActivityMap().keySet()) {
+            Set<ActivityId> eqSet = new HashSet<ActivityId>();
+            eqSet.add(taskId);
+            stageMap.put(taskId, eqSet);
+            stages.add(eqSet);
+        }
+
+        boolean changed = true;
+        while (changed) {
+            changed = false;
+            Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
+            if (pair != null) {
+                merge(stageMap, stages, pair.getLeft(), pair.getRight());
+                changed = true;
+            }
+        }
+
+        ActivityClusterGraph acg = new ActivityClusterGraph();
+        Map<ActivityId, ActivityCluster> acMap = new HashMap<ActivityId, ActivityCluster>();
+        int acCounter = 0;
+        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
+        List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
+        for (Set<ActivityId> stage : stages) {
+            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+            acList.add(ac);
+            for (ActivityId aid : stage) {
+                IActivity activity = activityNodeMap.get(aid);
+                ac.addActivity(activity);
+                acMap.put(aid, ac);
+            }
+        }
+
+        for (Set<ActivityId> stage : stages) {
+            for (ActivityId aid : stage) {
+                IActivity activity = activityNodeMap.get(aid);
+                ActivityCluster ac = acMap.get(aid);
+                List<IConnectorDescriptor> aOutputs = jag.getActivityOutputMap().get(aid);
+                if (aOutputs == null || aOutputs.isEmpty()) {
+                    ac.addRoot(activity);
+                } else {
+                    int nActivityOutputs = aOutputs.size();
+                    for (int i = 0; i < nActivityOutputs; ++i) {
+                        IConnectorDescriptor conn = aOutputs.get(i);
+                        ac.addConnector(conn);
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> pcPair = jag.getConnectorActivityMap()
+                                .get(conn.getConnectorId());
+                        ac.connect(conn, activity, i, pcPair.getRight().getLeft(), pcPair.getRight().getRight(), jag
+                                .getConnectorRecordDescriptorMap().get(conn.getConnectorId()));
+                    }
+                }
+            }
+        }
+
+        Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = jag.getBlocked2BlockerMap();
+        for (ActivityCluster s : acList) {
+            Map<ActivityId, Set<ActivityId>> acBlocked2BlockerMap = s.getBlocked2BlockerMap();
+            Set<ActivityCluster> blockerStages = new HashSet<ActivityCluster>();
+            for (ActivityId t : s.getActivityMap().keySet()) {
+                Set<ActivityId> blockerTasks = blocked2BlockerMap.get(t);
+                acBlocked2BlockerMap.put(t, blockerTasks);
+                if (blockerTasks != null) {
+                    for (ActivityId bt : blockerTasks) {
+                        blockerStages.add(acMap.get(bt));
+                    }
+                }
+            }
+            for (ActivityCluster bs : blockerStages) {
+                s.getDependencies().add(bs);
+            }
+        }
+        acg.addActivityClusters(acList);
+
+        if (LOGGER.isLoggable(Level.FINE)) {
+            try {
+                LOGGER.fine(acg.toJSON().toString(2));
+            } catch (JSONException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        }
+        return acg;
+    }
+
+    private void merge(Map<ActivityId, Set<ActivityId>> eqSetMap, Set<Set<ActivityId>> eqSets, ActivityId t1,
+            ActivityId t2) {
+        Set<ActivityId> stage1 = eqSetMap.get(t1);
+        Set<ActivityId> stage2 = eqSetMap.get(t2);
+
+        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+        mergedSet.addAll(stage1);
+        mergedSet.addAll(stage2);
+
+        eqSets.remove(stage1);
+        eqSets.remove(stage2);
+        eqSets.add(mergedSet);
+
+        for (ActivityId t : mergedSet) {
+            eqSetMap.put(t, mergedSet);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
similarity index 94%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
index 1c8446f..5b508b3 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
 
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
similarity index 94%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
index 840faef..fcdf9f0 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
 
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
similarity index 84%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
index a7b60e4..b4b9b3c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -35,14 +35,10 @@
 
     private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
 
-    public JobActivityGraphBuilder(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+    public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
         activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
-        jag = new JobActivityGraph(appName, jobFlags);
+        jag = new JobActivityGraph();
         this.jobSpec = jobSpec;
-        jag.setConnectorPolicyAssignmentPolicy(jobSpec.getConnectorPolicyAssignmentPolicy());
-        jag.setGlobalJobDataFactory(jobSpec.getGlobalJobDataFactory());
-        jag.setJobletEventListenerFactory(jobSpec.getJobletEventListenerFactory());
-        jag.setMaxReattempts(jobSpec.getMaxReattempts());
         connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
         connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
     }
@@ -54,15 +50,14 @@
 
     @Override
     public void addBlockingEdge(IActivity blocker, IActivity blocked) {
-        addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityId(), blocked.getActivityId());
         addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
     }
 
     @Override
     public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
-                    + operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
+            LOGGER.finest("Adding source edge: " + task.getActivityId() + ":" + operatorInputIndex + " -> "
+                    + task.getActivityId() + ":" + taskInputIndex);
         }
         IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
         IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
@@ -73,8 +68,8 @@
     @Override
     public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
-                    + operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
+            LOGGER.finest("Adding target edge: " + task.getActivityId() + ":" + operatorOutputIndex + " -> "
+                    + task.getActivityId() + ":" + taskOutputIndex);
         }
         IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
         IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..31c3d36
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,87 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final JobSpecification spec;
+
+    public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
+        this.spec = jobSpec;
+    }
+
+    @Override
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+            final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
+        final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
+        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+            @Override
+            public void visit(IConnectorDescriptor conn) throws HyracksException {
+                builder.addConnector(conn);
+            }
+        });
+        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) {
+                op.contributeActivities(builder);
+            }
+        });
+        builder.finish();
+        final JobActivityGraph jag = builder.getActivityGraph();
+        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
+
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+        acg.setMaxReattempts(spec.getMaxReattempts());
+        acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
+        acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+        final Set<Constraint> constraints = new HashSet<Constraint>();
+        final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+            @Override
+            public void addConstraint(Constraint constraint) {
+                constraints.add(constraint);
+            }
+        };
+        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) {
+                op.contributeSchedulingConstraints(acceptor, ccAppCtx);
+            }
+        });
+        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+            @Override
+            public void visit(IConnectorDescriptor conn) {
+                conn.contributeSchedulingConstraints(acceptor, acg.getConnectorMap().get(conn.getConnectorId()),
+                        ccAppCtx);
+            }
+        });
+        constraints.addAll(spec.getUserConstraints());
+        return new IActivityClusterGraphGenerator() {
+            @Override
+            public ActivityClusterGraph initialize() {
+                return acg;
+            }
+
+            @Override
+            public Set<Constraint> getConstraints() {
+                return constraints;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
similarity index 97%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
index 754094a..30f26c7 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
index 7466276..1e49fe2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -18,9 +18,12 @@
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 
 public interface ICCContext {
     public ClusterControllerInfo getClusterControllerInfo();
 
     public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+
+    public ClusterTopology getClusterTopology();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 0ac3658..3a6ee7a 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -28,7 +28,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 
 /**
  * Connector that connects operators in a Job.
@@ -91,10 +91,10 @@
      * 
      * @param constraintAcceptor
      *            - Constraint Acceptor
-     * @param plan
-     *            - Job Plan
+     * @param ac
+     *            - Activity Cluster
      */
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
             ICCApplicationContext appCtx);
 
     /**
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index c37a530..43bf141 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 
 /**
  * Descriptor for operators in Hyracks.
@@ -75,8 +74,7 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
-            ICCApplicationContext appCtx);
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx);
 
     /**
      * Gets the display name.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
new file mode 100644
index 0000000..6698ff7
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class ActivityCluster implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ActivityClusterGraph acg;
+
+    private final ActivityClusterId id;
+
+    private final List<IActivity> roots;
+
+    private final Map<ActivityId, IActivity> activities;
+
+    private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors;
+
+    private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+    private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+    private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+    private final List<ActivityCluster> dependencies;
+
+    private IConnectorPolicyAssignmentPolicy cpap;
+
+    public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+        this.acg = acg;
+        this.id = id;
+        roots = new ArrayList<IActivity>();
+        activities = new HashMap<ActivityId, IActivity>();
+        connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+        connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+        activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        dependencies = new ArrayList<ActivityCluster>();
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph() {
+        return acg;
+    }
+
+    public ActivityClusterId getId() {
+        return id;
+    }
+
+    public void addRoot(IActivity activity) {
+        roots.add(activity);
+    }
+
+    public void addActivity(IActivity activity) {
+        activities.put(activity.getActivityId(), activity);
+    }
+
+    public void addConnector(IConnectorDescriptor connector) {
+        connectors.put(connector.getConnectorId(), connector);
+    }
+
+    public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort,
+            IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) {
+        if (!activities.containsKey(producerActivity.getActivityId())
+                || !activities.containsKey(consumerActivity.getActivityId())) {
+            throw new IllegalStateException("Connected Activities belong to different Activity Clusters: "
+                    + producerActivity.getActivityId() + " and " + consumerActivity.getActivityId());
+        }
+        insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector);
+        insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector);
+        connectorActivityMap.put(
+                connector.getConnectorId(),
+                Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of(
+                        Pair.<IActivity, Integer> of(producerActivity, producerPort),
+                        Pair.<IActivity, Integer> of(consumerActivity, consumerPort)));
+        connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor);
+    }
+
+    public List<IActivity> getRoots() {
+        return roots;
+    }
+
+    public Map<ActivityId, IActivity> getActivityMap() {
+        return activities;
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+        return connectors;
+    }
+
+    public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+        return connectorRecordDescriptorMap;
+    }
+
+    public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+        return connectorActivityMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+        return activityInputMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+        return activityOutputMap;
+    }
+
+    public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getRight().getLeft().getActivityId();
+    }
+
+    public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getLeft().getLeft().getActivityId();
+    }
+
+    public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+        return blocked2blockerMap;
+    }
+
+    public List<ActivityCluster> getDependencies() {
+        return dependencies;
+    }
+
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return cpap;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) {
+        this.cpap = cpap;
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject jac = new JSONObject();
+
+        JSONArray jans = new JSONArray();
+        for (IActivity an : activities.values()) {
+            JSONObject jan = new JSONObject();
+            jan.put("id", an.getActivityId().toString());
+            jan.put("java-class", an.getClass().getName());
+
+            List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
+            if (inputs != null) {
+                JSONArray jInputs = new JSONArray();
+                for (int i = 0; i < inputs.size(); ++i) {
+                    JSONObject jInput = new JSONObject();
+                    jInput.put("input-port", i);
+                    jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+                    jInputs.put(jInput);
+                }
+                jan.put("inputs", jInputs);
+            }
+
+            List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
+            if (outputs != null) {
+                JSONArray jOutputs = new JSONArray();
+                for (int i = 0; i < outputs.size(); ++i) {
+                    JSONObject jOutput = new JSONObject();
+                    jOutput.put("output-port", i);
+                    jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+                    jOutputs.put(jOutput);
+                }
+                jan.put("outputs", jOutputs);
+            }
+
+            Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+            if (blockers != null) {
+                JSONArray jDeps = new JSONArray();
+                for (ActivityId blocker : blockers) {
+                    jDeps.put(blocker.toString());
+                }
+                jan.put("depends-on", jDeps);
+            }
+            jans.put(jan);
+        }
+        jac.put("activities", jans);
+
+        return jac;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
new file mode 100644
index 0000000..7285956
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -0,0 +1,117 @@
+/*
+2 * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+
+public class ActivityClusterGraph implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int version;
+
+    private final Map<ActivityClusterId, ActivityCluster> activityClusterMap;
+
+    private final Map<ActivityId, ActivityCluster> activityMap;
+
+    private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+    private int maxReattempts;
+
+    private IJobletEventListenerFactory jobletEventListenerFactory;
+
+    private IGlobalJobDataFactory globalJobDataFactory;
+
+    public ActivityClusterGraph() {
+        version = 0;
+        activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
+        activityMap = new HashMap<ActivityId, ActivityCluster>();
+        connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+    }
+
+    public Map<ActivityId, ActivityCluster> getActivityMap() {
+        return activityMap;
+    }
+
+    public Map<ConnectorDescriptorId, ActivityCluster> getConnectorMap() {
+        return connectorMap;
+    }
+
+    public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
+        return activityClusterMap;
+    }
+
+    public void addActivityClusters(Collection<ActivityCluster> newActivityClusters) {
+        for (ActivityCluster ac : newActivityClusters) {
+            activityClusterMap.put(ac.getId(), ac);
+            for (ActivityId aid : ac.getActivityMap().keySet()) {
+                activityMap.put(aid, ac);
+            }
+            for (ConnectorDescriptorId cid : ac.getConnectorMap().keySet()) {
+                connectorMap.put(cid, ac);
+            }
+        }
+        ++version;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setMaxReattempts(int maxReattempts) {
+        this.maxReattempts = maxReattempts;
+    }
+
+    public int getMaxReattempts() {
+        return maxReattempts;
+    }
+
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
+    public IGlobalJobDataFactory getGlobalJobDataFactory() {
+        return globalJobDataFactory;
+    }
+
+    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+        this.globalJobDataFactory = globalJobDataFactory;
+    }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject acgj = new JSONObject();
+
+        JSONArray acl = new JSONArray();
+        for (ActivityCluster ac : activityClusterMap.values()) {
+            acl.put(ac.toJSON());
+        }
+        acgj.put("version", version);
+        acgj.put("activity-clusters", acl);
+        return acgj;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
similarity index 65%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
index 46628a4..cba63959 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
@@ -12,19 +12,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
 
 import java.io.Serializable;
 
 public final class ActivityClusterId implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private final JobId jobId;
+
     private final int id;
 
-    public ActivityClusterId(int id) {
+    public ActivityClusterId(JobId jobId, int id) {
+        this.jobId = jobId;
         this.id = id;
     }
 
+    public JobId getJobId() {
+        return jobId;
+    }
+
     public int getId() {
         return id;
     }
@@ -34,25 +41,37 @@
         final int prime = 31;
         int result = 1;
         result = prime * result + id;
+        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
         return result;
     }
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         ActivityClusterId other = (ActivityClusterId) obj;
-        if (id != other.id)
+        if (id != other.id) {
             return false;
+        }
+        if (jobId == null) {
+            if (other.jobId != null) {
+                return false;
+            }
+        } else if (!jobId.equals(other.jobId)) {
+            return false;
+        }
         return true;
     }
 
     @Override
     public String toString() {
-        return "AC:" + id;
+        return "ACID:" + jobId + ":" + id;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
similarity index 68%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
index 840faef..b837066 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
@@ -12,11 +12,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
 
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import java.util.Set;
 
-public interface IOperatorDescriptorVisitor {
-    public void visit(IOperatorDescriptor op) throws HyracksException;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+
+public interface IActivityClusterGraphGenerator {
+    public Set<Constraint> getConstraints();
+
+    public ActivityClusterGraph initialize();
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
similarity index 60%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 840faef..d801dd1 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -12,11 +12,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
 
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
-public interface IOperatorDescriptorVisitor {
-    public void visit(IOperatorDescriptor op) throws HyracksException;
+public interface IActivityClusterGraphGeneratorFactory extends Serializable {
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+            ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
index c0d2ef8..2f0f025 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -17,7 +17,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 public interface IJobLifecycleListener {
-    public void notifyJobCreation(JobId jobId, IOperatorDescriptorRegistry jobSpec) throws HyracksException;
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
 
     public void notifyJobStart(JobId jobId) throws HyracksException;
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 3c6a547..5b62b5c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -15,31 +15,22 @@
 package edu.uci.ics.hyracks.api.job;
 
 import java.io.Serializable;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class JobActivityGraph implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final String appName;
-
-    private final EnumSet<JobFlag> jobFlags;
-
     private final Map<ActivityId, IActivity> activityMap;
 
     private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
@@ -52,39 +43,18 @@
 
     private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
 
-    private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
-
     private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
 
-    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
-
-    private int maxReattempts;
-
-    private IJobletEventListenerFactory jobletEventListenerFactory;
-
-    private IGlobalJobDataFactory globalJobDataFactory;
-
-    public JobActivityGraph(String appName, EnumSet<JobFlag> jobFlags) {
-        this.appName = appName;
-        this.jobFlags = jobFlags;
+    public JobActivityGraph() {
         activityMap = new HashMap<ActivityId, IActivity>();
         connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
         connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
         activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
         activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
         connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
-        blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
         blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
     }
 
-    public String getApplicationName() {
-        return appName;
-    }
-
-    public EnumSet<JobFlag> getJobFlags() {
-        return jobFlags;
-    }
-
     public Map<ActivityId, IActivity> getActivityMap() {
         return activityMap;
     }
@@ -97,10 +67,6 @@
         return connectorRecordDescriptorMap;
     }
 
-    public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
-        return blocker2blockedMap;
-    }
-
     public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
         return blocked2blockerMap;
     }
@@ -127,97 +93,13 @@
         return connEdge.getLeft().getLeft().getActivityId();
     }
 
-    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
-        return connectorPolicyAssignmentPolicy;
-    }
-
-    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
-        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
-    }
-
-    public void setMaxReattempts(int maxReattempts) {
-        this.maxReattempts = maxReattempts;
-    }
-
-    public int getMaxReattempts() {
-        return maxReattempts;
-    }
-
-    public IJobletEventListenerFactory getJobletEventListenerFactory() {
-        return jobletEventListenerFactory;
-    }
-
-    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
-        this.jobletEventListenerFactory = jobletEventListenerFactory;
-    }
-
-    public IGlobalJobDataFactory getGlobalJobDataFactory() {
-        return globalJobDataFactory;
-    }
-
-    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
-        this.globalJobDataFactory = globalJobDataFactory;
-    }
-
     @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();
         buffer.append("ActivityNodes: " + activityMap);
         buffer.append('\n');
-        buffer.append("Blocker->Blocked: " + blocker2blockedMap);
-        buffer.append('\n');
-        buffer.append("Blocked->Blocker: " + blocked2blockerMap);
+        buffer.append("Blocker->Blocked: " + blocked2blockerMap);
         buffer.append('\n');
         return buffer.toString();
     }
-
-    public JSONObject toJSON() throws JSONException {
-        JSONObject jplan = new JSONObject();
-
-        jplan.put("flags", jobFlags.toString());
-
-        JSONArray jans = new JSONArray();
-        for (IActivity an : activityMap.values()) {
-            JSONObject jan = new JSONObject();
-            jan.put("id", an.getActivityId().toString());
-            jan.put("java-class", an.getClass().getName());
-
-            List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
-            if (inputs != null) {
-                JSONArray jInputs = new JSONArray();
-                for (int i = 0; i < inputs.size(); ++i) {
-                    JSONObject jInput = new JSONObject();
-                    jInput.put("input-port", i);
-                    jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
-                    jInputs.put(jInput);
-                }
-                jan.put("inputs", jInputs);
-            }
-
-            List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
-            if (outputs != null) {
-                JSONArray jOutputs = new JSONArray();
-                for (int i = 0; i < outputs.size(); ++i) {
-                    JSONObject jOutput = new JSONObject();
-                    jOutput.put("output-port", i);
-                    jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
-                    jOutputs.put(jOutput);
-                }
-                jan.put("outputs", jOutputs);
-            }
-
-            Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
-            if (blockers != null) {
-                JSONArray jDeps = new JSONArray();
-                for (ActivityId blocker : blockers) {
-                    jDeps.put(blocker.toString());
-                }
-                jan.put("depends-on", jDeps);
-            }
-            jans.put(jan);
-        }
-        jplan.put("activities", jans);
-
-        return jplan;
-    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
new file mode 100644
index 0000000..6f566d1
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ClusterTopology implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final NetworkSwitch clusterSwitch;
+
+    public ClusterTopology(NetworkSwitch clusterSwitch) {
+        this.clusterSwitch = clusterSwitch;
+    }
+
+    public NetworkSwitch getClusterSwitch() {
+        return clusterSwitch;
+    }
+
+    public boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+        return clusterSwitch.lookupNetworkTerminal(terminalName, path);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
new file mode 100644
index 0000000..0aaee2a
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class NetworkEndpoint implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum EndpointType {
+        NETWORK_SWITCH,
+        NETWORK_TERMINAL,
+    }
+
+    protected final String name;
+
+    protected final Map<String, String> properties;
+
+    public NetworkEndpoint(String name, Map<String, String> properties) {
+        this.name = name;
+        this.properties = properties;
+    }
+
+    public abstract EndpointType getType();
+
+    public final String getName() {
+        return name;
+    }
+
+    public final Map<String, String> getProperties() {
+        return properties;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
new file mode 100644
index 0000000..3c89044
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetworkSwitch extends NetworkEndpoint {
+    private static final long serialVersionUID = 1L;
+
+    private final Port[] ports;
+
+    private final Map<String, Integer> terminalNamePortIndexMap;
+
+    public NetworkSwitch(String name, Map<String, String> properties, Port[] ports) {
+        super(name, properties);
+        this.ports = ports;
+        terminalNamePortIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < ports.length; ++i) {
+            Port port = ports[i];
+            NetworkEndpoint endpoint = port.getEndpoint();
+            Integer portIndex = Integer.valueOf(i);
+            switch (endpoint.getType()) {
+                case NETWORK_SWITCH: {
+                    NetworkSwitch s = (NetworkSwitch) endpoint;
+                    for (String t : s.terminalNamePortIndexMap.keySet()) {
+                        terminalNamePortIndexMap.put(t, portIndex);
+                    }
+                    break;
+                }
+
+                case NETWORK_TERMINAL: {
+                    NetworkTerminal t = (NetworkTerminal) endpoint;
+                    terminalNamePortIndexMap.put(t.getName(), portIndex);
+                    break;
+                }
+            }
+        }
+    }
+
+    public Port[] getPorts() {
+        return ports;
+    }
+
+    @Override
+    public EndpointType getType() {
+        return EndpointType.NETWORK_SWITCH;
+    }
+
+    boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+        if (terminalNamePortIndexMap.containsKey(terminalName)) {
+            Integer portIndex = terminalNamePortIndexMap.get(terminalName);
+            path.add(portIndex);
+            NetworkEndpoint endpoint = ports[portIndex.intValue()].getEndpoint();
+            if (endpoint.getType() == EndpointType.NETWORK_SWITCH) {
+                ((NetworkSwitch) endpoint).lookupNetworkTerminal(terminalName, path);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    void getPortList(List<Integer> path, int stepIndex, List<Port> portList) {
+        if (stepIndex >= path.size()) {
+            return;
+        }
+        int portIndex = path.get(stepIndex);
+        Port port = ports[portIndex];
+        portList.add(port);
+        ++stepIndex;
+        if (stepIndex >= path.size()) {
+            return;
+        }
+        NetworkEndpoint endpoint = port.getEndpoint();
+        if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+            throw new IllegalArgumentException("Path provided, " + path + ", longer than depth of topology tree");
+        }
+        ((NetworkSwitch) endpoint).getPortList(path, stepIndex, portList);
+    }
+
+    public static class Port implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final NetworkEndpoint endpoint;
+
+        public Port(NetworkEndpoint endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        public NetworkEndpoint getEndpoint() {
+            return endpoint;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
similarity index 61%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
index 840faef..f36da76 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
@@ -12,11 +12,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.topology;
 
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import java.util.Map;
 
-public interface IOperatorDescriptorVisitor {
-    public void visit(IOperatorDescriptor op) throws HyracksException;
+public class NetworkTerminal extends NetworkEndpoint {
+    private static final long serialVersionUID = 1L;
+
+    public NetworkTerminal(String name, Map<String, String> properties) {
+        super(name, properties);
+    }
+
+    @Override
+    public EndpointType getType() {
+        return EndpointType.NETWORK_TERMINAL;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
new file mode 100644
index 0000000..0ca6127
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import edu.uci.ics.hyracks.api.topology.NetworkEndpoint.EndpointType;
+
+public class TopologyDefinitionParser {
+    private final Stack<ElementStackEntry> stack;
+
+    private boolean inPropertyElement;
+
+    private TopologyDefinitionParser() {
+        stack = new Stack<ElementStackEntry>();
+        inPropertyElement = false;
+    }
+
+    public static ClusterTopology parse(InputSource in) throws IOException, SAXException {
+        TopologyDefinitionParser parser = new TopologyDefinitionParser();
+        return parser.parseInternal(in);
+    }
+
+    private ClusterTopology parseInternal(InputSource in) throws IOException, SAXException {
+        XMLReader parser;
+        parser = XMLReaderFactory.createXMLReader();
+        SAXContentHandler handler = new SAXContentHandler();
+        parser.setContentHandler(handler);
+        parser.parse(in);
+        if (stack.size() != 1) {
+            throw new IllegalStateException("Malformed topology definition");
+        }
+        ElementStackEntry e = stack.pop();
+        if (e.ports.size() != 1) {
+            throw new IllegalArgumentException("Malformed topology definition");
+        }
+        NetworkEndpoint endpoint = e.ports.get(0).getEndpoint();
+        if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+            throw new IllegalArgumentException("Top level content in cluster-topology must be network-switch");
+        }
+        return new ClusterTopology((NetworkSwitch) endpoint);
+    }
+
+    private class SAXContentHandler extends DefaultHandler {
+        @Override
+        public void endElement(String uri, String localName, String qName) throws SAXException {
+            if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+                ElementStackEntry e = stack.pop();
+                NetworkEndpoint endpoint = e.type == EndpointType.NETWORK_SWITCH ? new NetworkSwitch(e.name,
+                        e.properties, e.ports.toArray(new NetworkSwitch.Port[e.ports.size()])) : new NetworkTerminal(
+                        e.name, e.properties);
+                stack.peek().ports.add(new NetworkSwitch.Port(endpoint));
+            } else if ("property".equals(localName)) {
+                if (!inPropertyElement) {
+                    throw new IllegalStateException("Improperly nested property element encountered");
+                }
+                inPropertyElement = false;
+            }
+        }
+
+        @Override
+        public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+            if ("cluster-topology".equals(localName)) {
+                if (!stack.isEmpty()) {
+                    throw new IllegalStateException("Encountered unexpected " + qName);
+                }
+                stack.push(new ElementStackEntry(null, ""));
+            } else if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+                String name = atts.getValue("", "name");
+                if (name == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+                }
+                EndpointType type = "network-switch".equals(localName) ? EndpointType.NETWORK_SWITCH
+                        : EndpointType.NETWORK_TERMINAL;
+                ElementStackEntry e = new ElementStackEntry(type, name);
+                stack.push(e);
+            } else if ("property".equals(localName)) {
+                if (inPropertyElement) {
+                    throw new IllegalStateException("Improperly nested property element encountered");
+                }
+                String name = atts.getValue("", "name");
+                if (name == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+                }
+                String value = atts.getValue("", "value");
+                if (value == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no value attribute");
+                }
+                stack.peek().properties.put(name, value);
+                inPropertyElement = true;
+            }
+        }
+    }
+
+    private static class ElementStackEntry {
+        private final EndpointType type;
+
+        private final String name;
+
+        private final Map<String, String> properties;
+
+        private final List<NetworkSwitch.Port> ports;
+
+        public ElementStackEntry(EndpointType type, String name) {
+            this.type = type;
+            this.name = name;
+            this.properties = new HashMap<String, String>();
+            ports = new ArrayList<NetworkSwitch.Port>();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a9a69fa..32b031d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc;
 
 import java.io.File;
+import java.io.FileReader;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -29,12 +30,16 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.xml.sax.InputSource;
+
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
@@ -137,6 +142,7 @@
         };
         workQueue = new WorkQueue();
         this.timer = new Timer(true);
+        final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ICCContext() {
             @Override
             public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception {
@@ -148,11 +154,29 @@
             public ClusterControllerInfo getClusterControllerInfo() {
                 return info;
             }
+
+            @Override
+            public ClusterTopology getClusterTopology() {
+                return topology;
+            }
         };
         sweeper = new DeadNodeSweeper();
         jobCounter = 0;
     }
 
+    private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
+        if (ccConfig.clusterTopologyDefinition == null) {
+            return null;
+        }
+        FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+        InputSource in = new InputSource(fr);
+        try {
+            return TopologyDefinitionParser.parse(in);
+        } finally {
+            fr.close();
+        }
+    }
+
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
@@ -292,7 +316,7 @@
                     HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
                     workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getAppName(), sjf
-                            .getJobSpec(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+                            .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
                     return;
                 }
 
@@ -308,6 +332,15 @@
                             new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
                     return;
                 }
+
+                case GET_CLUSTER_TOPOLOGY: {
+                    try {
+                        handle.send(mid, ccContext.getClusterTopology(), null);
+                    } catch (IPCException e) {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
             }
             try {
                 handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index a486c33..e165415 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
+import edu.uci.ics.hyracks.control.cc.work.GetActivityClusterGraphJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
 
 public class JobDetailsPage extends AbstractPage {
@@ -48,13 +48,13 @@
 
         JobId jobId = JobId.parse(jobIdStr.toString());
 
-        GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId);
-        ccs.getWorkQueue().scheduleAndSync(gjagw);
-        Label jag = new Label("job-activity-graph", gjagw.getJSON().toString());
+        GetActivityClusterGraphJSONWork gacgw = new GetActivityClusterGraphJSONWork(ccs, jobId);
+        ccs.getWorkQueue().scheduleAndSync(gacgw);
+        Label jag = new Label("activity-cluster-graph", gacgw.getJSON().toString());
         jag.setEscapeModelStrings(false);
         add(jag);
 
-        JSONObject jagO = gjagw.getJSON();
+        JSONObject jagO = gacgw.getJSON();
 
         Map<ActivityId, String> activityMap = new HashMap<ActivityId, String>();
         if (jagO.has("activities")) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 325ea84..24dfa7c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -25,10 +25,9 @@
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -63,9 +62,10 @@
         return ccContext;
     }
 
-    public JobSpecification createJobSpecification(byte[] bytes) throws HyracksException {
+    public IActivityClusterGraphGeneratorFactory createActivityClusterGraphGeneratorFactory(byte[] bytes)
+            throws HyracksException {
         try {
-            return (JobSpecification) JavaSerializationUtils.deserialize(bytes, getClassLoader());
+            return (IActivityClusterGraphGeneratorFactory) JavaSerializationUtils.deserialize(bytes, getClassLoader());
         } catch (IOException e) {
             throw new HyracksException(e);
         } catch (ClassNotFoundException e) {
@@ -102,10 +102,10 @@
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, IOperatorDescriptorRegistry specification)
+    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, specification);
+            l.notifyJobCreation(jobId, acggf);
         }
     }
 
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
deleted file mode 100644
index d351d4a..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.job;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-
-public class ActivityCluster {
-    private final Set<ActivityId> activities;
-
-    private final Set<ActivityCluster> dependencies;
-
-    private final Set<ActivityCluster> dependents;
-
-    private ActivityClusterId id;
-
-    private ActivityClusterPlan acp;
-
-    public ActivityCluster(Set<ActivityId> activities) {
-        this.activities = activities;
-        dependencies = new HashSet<ActivityCluster>();
-        dependents = new HashSet<ActivityCluster>();
-    }
-
-    public Set<ActivityId> getActivities() {
-        return activities;
-    }
-
-    public void addDependency(ActivityCluster stage) {
-        dependencies.add(stage);
-    }
-
-    public void addDependent(ActivityCluster stage) {
-        dependents.add(stage);
-    }
-
-    public Set<ActivityCluster> getDependencies() {
-        return dependencies;
-    }
-
-    public Set<ActivityCluster> getDependents() {
-        return dependents;
-    }
-
-    public ActivityClusterId getActivityClusterId() {
-        return id;
-    }
-
-    public void setActivityClusterId(ActivityClusterId id) {
-        this.id = id;
-    }
-
-    public ActivityClusterPlan getPlan() {
-        return acp;
-    }
-
-    public void setPlan(ActivityClusterPlan acp) {
-        this.acp = acp;
-    }
-
-    @Override
-    public String toString() {
-        return String.valueOf(activities);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 707ef1f..6f26de2 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,10 +30,15 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
 import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
@@ -41,7 +47,17 @@
 public class JobRun implements IJobStatusConditionVariable {
     private final JobId jobId;
 
-    private final JobActivityGraph jag;
+    private final String applicationName;
+
+    private final IActivityClusterGraphGenerator acgg;
+
+    private final ActivityClusterGraph acg;
+
+    private final JobScheduler scheduler;
+
+    private final EnumSet<JobFlag> jobFlags;
+
+    private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
 
     private final PartitionMatchMaker pmm;
 
@@ -51,14 +67,8 @@
 
     private final JobProfile profile;
 
-    private Set<ActivityCluster> activityClusters;
-
-    private final Map<ActivityId, ActivityCluster> activityClusterMap;
-
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
 
-    private JobScheduler js;
-
     private long createTime;
 
     private long startTime;
@@ -73,14 +83,19 @@
 
     private Exception pendingException;
 
-    public JobRun(JobId jobId, JobActivityGraph plan) {
+    public JobRun(ClusterControllerService ccs, JobId jobId, String applicationName,
+            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
         this.jobId = jobId;
-        this.jag = plan;
+        this.applicationName = applicationName;
+        this.acgg = acgg;
+        this.acg = acgg.initialize();
+        this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+        this.jobFlags = jobFlags;
+        activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
         pmm = new PartitionMatchMaker();
         participatingNodeIds = new HashSet<String>();
         cleanupPendingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
-        activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
     }
 
@@ -88,8 +103,20 @@
         return jobId;
     }
 
-    public JobActivityGraph getJobActivityGraph() {
-        return jag;
+    public String getApplicationName() {
+        return applicationName;
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph() {
+        return acg;
+    }
+
+    public EnumSet<JobFlag> getFlags() {
+        return jobFlags;
+    }
+
+    public Map<ActivityClusterId, ActivityClusterPlan> getActivityClusterPlanMap() {
+        return activityClusterPlanMap;
     }
 
     public PartitionMatchMaker getPartitionMatchMaker() {
@@ -169,24 +196,8 @@
         return profile;
     }
 
-    public void setScheduler(JobScheduler js) {
-        this.js = js;
-    }
-
     public JobScheduler getScheduler() {
-        return js;
-    }
-
-    public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
-        return activityClusterMap;
-    }
-
-    public Set<ActivityCluster> getActivityClusters() {
-        return activityClusters;
-    }
-
-    public void setActivityClusters(Set<ActivityCluster> activityClusters) {
-        this.activityClusters = activityClusters;
+        return scheduler;
     }
 
     public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
@@ -197,37 +208,31 @@
         JSONObject result = new JSONObject();
 
         result.put("job-id", jobId.toString());
-        result.put("application-name", jag.getApplicationName());
+        result.put("application-name", applicationName);
         result.put("status", getStatus());
         result.put("create-time", getCreateTime());
         result.put("start-time", getCreateTime());
         result.put("end-time", getCreateTime());
 
         JSONArray aClusters = new JSONArray();
-        for (ActivityCluster ac : activityClusters) {
+        for (ActivityCluster ac : acg.getActivityClusterMap().values()) {
             JSONObject acJSON = new JSONObject();
 
-            acJSON.put("activity-cluster-id", String.valueOf(ac.getActivityClusterId()));
+            acJSON.put("activity-cluster-id", String.valueOf(ac.getId()));
 
             JSONArray activitiesJSON = new JSONArray();
-            for (ActivityId aid : ac.getActivities()) {
+            for (ActivityId aid : ac.getActivityMap().keySet()) {
                 activitiesJSON.put(aid);
             }
             acJSON.put("activities", activitiesJSON);
 
-            JSONArray dependentsJSON = new JSONArray();
-            for (ActivityCluster dependent : ac.getDependents()) {
-                dependentsJSON.put(String.valueOf(dependent.getActivityClusterId()));
-            }
-            acJSON.put("dependents", dependentsJSON);
-
             JSONArray dependenciesJSON = new JSONArray();
             for (ActivityCluster dependency : ac.getDependencies()) {
-                dependenciesJSON.put(String.valueOf(dependency.getActivityClusterId()));
+                dependenciesJSON.put(String.valueOf(dependency.getId()));
             }
             acJSON.put("dependencies", dependenciesJSON);
 
-            ActivityClusterPlan acp = ac.getPlan();
+            ActivityClusterPlan acp = activityClusterPlanMap.get(ac.getId());
             if (acp == null) {
                 acJSON.put("plan", (Object) null);
             } else {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index f41cda9..17beb2d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Set;
 
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class TaskCluster {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
index b1000dd..1218c14 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
@@ -16,6 +16,8 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+
 public final class TaskClusterId implements Serializable {
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
deleted file mode 100644
index 9acc4b8..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-
-public class ActivityClusterGraphBuilder {
-    private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
-
-    private final JobRun jobRun;
-
-    public ActivityClusterGraphBuilder(JobRun jobRun) {
-        this.jobRun = jobRun;
-    }
-
-    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<ActivityCluster> eqSets) {
-        for (ActivityCluster eqSet : eqSets) {
-            for (ActivityId t : eqSet.getActivities()) {
-                List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
-                if (inputList != null) {
-                    for (IConnectorDescriptor conn : inputList) {
-                        ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
-                        if (!eqSet.getActivities().contains(inTask)) {
-                            return Pair.<ActivityId, ActivityId> of(t, inTask);
-                        }
-                    }
-                }
-                List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
-                if (outputList != null) {
-                    for (IConnectorDescriptor conn : outputList) {
-                        ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
-                        if (!eqSet.getActivities().contains(outTask)) {
-                            return Pair.<ActivityId, ActivityId> of(t, outTask);
-                        }
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
-        /*
-         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
-         */
-        Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
-        Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
-        for (ActivityId taskId : jag.getActivityMap().keySet()) {
-            Set<ActivityId> eqSet = new HashSet<ActivityId>();
-            eqSet.add(taskId);
-            ActivityCluster stage = new ActivityCluster(eqSet);
-            stageMap.put(taskId, stage);
-            stages.add(stage);
-        }
-
-        boolean changed = true;
-        while (changed) {
-            changed = false;
-            Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
-            if (pair != null) {
-                merge(stageMap, stages, pair.getLeft(), pair.getRight());
-                changed = true;
-            }
-        }
-
-        Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
-        for (ActivityCluster s : stages) {
-            Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
-            for (ActivityId t : s.getActivities()) {
-                Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
-                if (blockedTasks != null) {
-                    for (ActivityId bt : blockedTasks) {
-                        blockedStages.add(stageMap.get(bt));
-                    }
-                }
-            }
-            for (ActivityCluster bs : blockedStages) {
-                bs.addDependency(s);
-                s.addDependent(bs);
-            }
-        }
-        Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
-        int idCounter = 0;
-        for (ActivityCluster s : stages) {
-            s.setActivityClusterId(new ActivityClusterId(idCounter++));
-            if (s.getDependents().isEmpty()) {
-                roots.add(s);
-            }
-        }
-        jobRun.setActivityClusters(stages);
-        jobRun.getActivityClusterMap().putAll(stageMap);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Inferred " + stages.size() + " stages");
-            for (ActivityCluster s : stages) {
-                LOGGER.info(s.toString());
-            }
-        }
-        return roots;
-    }
-
-    private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
-            ActivityId t2) {
-        ActivityCluster stage1 = eqSetMap.get(t1);
-        Set<ActivityId> s1 = stage1.getActivities();
-        ActivityCluster stage2 = eqSetMap.get(t2);
-        Set<ActivityId> s2 = stage2.getActivities();
-
-        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
-        mergedSet.addAll(s1);
-        mergedSet.addAll(s2);
-
-        eqSets.remove(stage1);
-        eqSets.remove(stage2);
-        ActivityCluster mergedStage = new ActivityCluster(mergedSet);
-        eqSets.add(mergedStage);
-
-        for (ActivityId t : mergedSet) {
-            eqSetMap.put(t, mergedStage);
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 23f53f4..6c52aac 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -38,9 +38,9 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
 import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -62,7 +62,7 @@
         partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
     }
 
-    public void planActivityCluster(ActivityCluster ac) throws HyracksException {
+    public ActivityClusterPlan planActivityCluster(ActivityCluster ac) throws HyracksException {
         JobRun jobRun = scheduler.getJobRun();
         Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
@@ -80,16 +80,16 @@
             }
         }
 
-        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+        return new ActivityClusterPlan(taskClusters, activityPlanMap);
     }
 
     private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
             Map<ActivityId, ActivityPartitionDetails> pcMap) {
         Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
         Set<ActivityId> depAnIds = new HashSet<ActivityId>();
-        for (ActivityId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
             depAnIds.clear();
-            getDependencyActivityIds(depAnIds, anId);
+            getDependencyActivityIds(depAnIds, anId, ac);
             ActivityPartitionDetails apd = pcMap.get(anId);
             Task[] tasks = new Task[apd.getPartitionCount()];
             ActivityPlan activityPlan = new ActivityPlan(apd);
@@ -97,8 +97,8 @@
                 TaskId tid = new TaskId(anId, i);
                 tasks[i] = new Task(tid, activityPlan);
                 for (ActivityId danId : depAnIds) {
-                    ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
-                    ActivityClusterPlan dACP = dAC.getPlan();
+                    ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
+                    ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
                     assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
                             + danId;
                     Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
@@ -120,7 +120,7 @@
 
     private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
             Map<ActivityId, ActivityPlan> activityPlanMap) {
-        Set<ActivityId> activities = ac.getActivities();
+        Set<ActivityId> activities = ac.getActivityMap().keySet();
         Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
                 activityPlanMap, activities);
 
@@ -161,15 +161,15 @@
     private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
             Map<ActivityId, ActivityPlan> activityPlanMap) {
         List<Task> taskStates = new ArrayList<Task>();
-        for (ActivityId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
             ActivityPlan ap = activityPlanMap.get(anId);
             Task[] tasks = ap.getTasks();
             for (Task t : tasks) {
                 taskStates.add(t);
             }
         }
-        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
-                taskStates.toArray(new Task[taskStates.size()]));
+        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
+                .size()]));
         for (Task t : tc.getTasks()) {
             t.setTaskCluster(tc);
         }
@@ -179,16 +179,17 @@
     private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
             Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
         Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
         BitSet targetBitmap = new BitSet();
         for (ActivityId ac1 : activities) {
+            ActivityCluster ac = acg.getActivityMap().get(ac1);
             Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
             int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(ac1);
+            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
+                    ActivityId ac2 = ac.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
                     int nConsumers = ac2TaskStates.length;
                     for (int i = 0; i < nProducers; ++i) {
@@ -214,7 +215,7 @@
             Map<ActivityId, ActivityPlan> activityPlanMap,
             Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
         Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-        for (ActivityId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
             ActivityPlan ap = activityPlanMap.get(anId);
             Task[] tasks = ap.getTasks();
             for (Task t : tasks) {
@@ -298,7 +299,7 @@
             for (TaskId tid : cluster) {
                 taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
             }
-            TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), counter++), ac,
+            TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), counter++), ac,
                     taskStates.toArray(new Task[taskStates.size()]));
             tcSet.add(tc);
             for (TaskId tid : cluster) {
@@ -310,35 +311,34 @@
     }
 
     private TaskCluster getTaskCluster(TaskId tid) {
-        ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
-        ActivityClusterPlan acp = ac.getPlan();
+        JobRun run = scheduler.getJobRun();
+        ActivityCluster ac = run.getActivityClusterGraph().getActivityMap().get(tid.getActivityId());
+        ActivityClusterPlan acp = run.getActivityClusterPlanMap().get(ac.getId());
         Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
         Task task = tasks[tid.getPartition()];
         assert task.getTaskId().equals(tid);
         return task.getTaskCluster();
     }
 
-    private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId) {
-        JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
-        Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(anId);
+    private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId, ActivityCluster ac) {
+        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(anId);
         if (blockers != null) {
             depAnIds.addAll(blockers);
         }
     }
 
     private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
-        JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        Set<ActivityId> activities = ac.getActivities();
+        Set<ActivityId> activities = ac.getActivityMap().keySet();
         BitSet targetBitmap = new BitSet();
         for (ActivityId a1 : activities) {
             Task[] ac1TaskStates = taskMap.get(a1).getTasks();
             int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(a1);
+            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(a1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId a2 = jag.getConsumerActivity(cdId);
+                    ActivityId a2 = ac.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = taskMap.get(a2).getTasks();
                     int nConsumers = ac2TaskStates.length;
 
@@ -347,7 +347,7 @@
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
                         fanouts[i] = targetBitmap.cardinality();
                     }
-                    IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+                    IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
                     cPolicyMap.put(cdId, cp);
                 }
             }
@@ -355,9 +355,9 @@
         scheduler.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
     }
 
-    private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
-        IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph()
-                .getConnectorPolicyAssignmentPolicy();
+    private IConnectorPolicy assignConnectorPolicy(ActivityCluster ac, IConnectorDescriptor c, int nProducers,
+            int nConsumers, int[] fanouts) {
+        IConnectorPolicyAssignmentPolicy cpap = ac.getConnectorPolicyAssignmentPolicy();
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
         }
@@ -367,10 +367,8 @@
     private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
             throws HyracksException {
         PartitionConstraintSolver solver = scheduler.getSolver();
-        JobRun jobRun = scheduler.getJobRun();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
         Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
-        for (ActivityId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
             lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
         }
         solver.solve(lValues);
@@ -391,26 +389,26 @@
             nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
         }
         Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
-        for (ActivityId anId : ac.getActivities()) {
+        for (ActivityId anId : ac.getActivityMap().keySet()) {
             int nParts = nPartMap.get(anId.getOperatorDescriptorId());
             int[] nInputPartitions = null;
-            List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(anId);
+            List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(anId);
             if (inputs != null) {
                 nInputPartitions = new int[inputs.size()];
                 for (int i = 0; i < nInputPartitions.length; ++i) {
                     ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
-                    ActivityId aid = jag.getProducerActivity(cdId);
+                    ActivityId aid = ac.getProducerActivity(cdId);
                     Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
                     nInputPartitions[i] = nPartInt;
                 }
             }
             int[] nOutputPartitions = null;
-            List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(anId);
+            List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(anId);
             if (outputs != null) {
                 nOutputPartitions = new int[outputs.size()];
                 for (int i = 0; i < nOutputPartitions.length; ++i) {
                     ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
-                    ActivityId aid = jag.getConsumerActivity(cdId);
+                    ActivityId aid = ac.getConsumerActivity(cdId);
                     Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
                     nOutputPartitions[i] = nPartInt;
                 }
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index b3a76ad..b163db5 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -35,14 +35,15 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
@@ -66,8 +67,6 @@
 
     private final Set<TaskCluster> inProgressTaskClusters;
 
-    private Set<ActivityCluster> rootActivityClusters;
-
     public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
         this.ccs = ccs;
         this.jobRun = jobRun;
@@ -75,8 +74,6 @@
         partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
         inProgressTaskClusters = new HashSet<TaskCluster>();
         solver.addConstraints(constraints);
-        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
-        rootActivityClusters = acgb.inferActivityClusters(jobRun.getJobActivityGraph());
     }
 
     public JobRun getJobRun() {
@@ -91,7 +88,7 @@
         startRunnableActivityClusters();
     }
 
-    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
+    private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
             throws HyracksException {
         for (ActivityCluster root : roots) {
             findRunnableTaskClusterRoots(frontier, root);
@@ -107,14 +104,13 @@
                 findRunnableTaskClusterRoots(frontier, depAC);
             } else {
                 boolean tcRootsComplete = true;
-                Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
-                for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
+                for (TaskCluster tc : getActivityClusterPlan(depAC).getTaskClusters()) {
                     if (tc.getProducedPartitions().isEmpty()) {
                         TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
                         if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
                             tcRootsComplete = false;
+                            break;
                         }
-                        depACTCRoots.add(tc);
                     }
                 }
                 if (!tcRootsComplete) {
@@ -126,10 +122,11 @@
         if (depsComplete) {
             if (!isPlanned(candidate)) {
                 ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
-                acp.planActivityCluster(candidate);
+                ActivityClusterPlan acPlan = acp.planActivityCluster(candidate);
+                jobRun.getActivityClusterPlanMap().put(candidate.getId(), acPlan);
                 partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
             }
-            for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
+            for (TaskCluster tc : getActivityClusterPlan(candidate).getTaskClusters()) {
                 if (tc.getProducedPartitions().isEmpty()) {
                     TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
                     if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
@@ -140,13 +137,18 @@
         }
     }
 
+    private ActivityClusterPlan getActivityClusterPlan(ActivityCluster ac) {
+        return jobRun.getActivityClusterPlanMap().get(ac.getId());
+    }
+
     private boolean isPlanned(ActivityCluster ac) {
-        return ac.getPlan() != null;
+        return jobRun.getActivityClusterPlanMap().get(ac.getId()) != null;
     }
 
     private void startRunnableActivityClusters() throws HyracksException {
         Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
-        findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
+        findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
+                .values());
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
                     + inProgressTaskClusters);
@@ -286,7 +288,7 @@
 
     private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
             throws HyracksException {
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
         Task[] tasks = tc.getTasks();
         List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
         int attempts = tcAttempts.size();
@@ -309,7 +311,7 @@
             Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
             TaskAttempt taskAttempt = taskAttempts.get(tid);
-            String nodeId = assignLocation(jag, locationMap, tid, taskAttempt);
+            String nodeId = assignLocation(acg, locationMap, tid, taskAttempt);
             taskAttempt.setNodeId(nodeId);
             taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
             taskAttempt.setStartTime(System.currentTimeMillis());
@@ -355,10 +357,11 @@
         inProgressTaskClusters.add(tc);
     }
 
-    private String assignLocation(JobActivityGraph jag, Map<TaskId, LValueConstraintExpression> locationMap,
+    private String assignLocation(ActivityClusterGraph acg, Map<TaskId, LValueConstraintExpression> locationMap,
             TaskId tid, TaskAttempt taskAttempt) throws HyracksException {
         ActivityId aid = tid.getActivityId();
-        Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
+        ActivityCluster ac = acg.getActivityMap().get(aid);
+        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(aid);
         String nodeId = null;
         if (blockers != null) {
             for (ActivityId blocker : blockers) {
@@ -404,8 +407,8 @@
 
     private String findTaskLocation(TaskId tid) {
         ActivityId aid = tid.getActivityId();
-        ActivityCluster ac = jobRun.getActivityClusterMap().get(aid);
-        Task[] tasks = ac.getPlan().getActivityPlanMap().get(aid).getTasks();
+        ActivityCluster ac = jobRun.getActivityClusterGraph().getActivityMap().get(aid);
+        Task[] tasks = getActivityClusterPlan(ac).getActivityPlanMap().get(aid).getTasks();
         List<TaskClusterAttempt> tcAttempts = tasks[tid.getPartition()].getTaskCluster().getAttempts();
         if (tcAttempts == null || tcAttempts.isEmpty()) {
             return null;
@@ -425,8 +428,8 @@
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
         final JobId jobId = jobRun.getJobId();
-        final JobActivityGraph jag = jobRun.getJobActivityGraph();
-        final String appName = jag.getApplicationName();
+        final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
+        final String appName = jobRun.getApplicationName();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
                 jobRun.getConnectorPolicyMap());
         for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
@@ -440,8 +443,9 @@
                     LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
                 }
                 try {
-                    byte[] jagBytes = changed ? JavaSerializationUtils.serialize(jag) : null;
-                    node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies);
+                    byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
+                    node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies,
+                            jobRun.getFlags());
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -606,7 +610,7 @@
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
                 lastAttempt.setEndTime(System.currentTimeMillis());
                 abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getMaxReattempts()) {
+                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
                     abortJob(new HyracksException(details));
                     return;
                 }
@@ -629,8 +633,8 @@
     public void notifyNodeFailures(Set<String> deadNodes) {
         try {
             jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
-            for (ActivityCluster ac : jobRun.getActivityClusters()) {
-                TaskCluster[] taskClusters = ac.getPlan().getTaskClusters();
+            for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
+                TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
                 if (taskClusters != null) {
                     for (TaskCluster tc : taskClusters) {
                         TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
index c964e4e..98cef8d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
-import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
+import edu.uci.ics.hyracks.control.cc.work.GetActivityClusterGraphJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
 
@@ -49,7 +49,7 @@
                 JobId jobId = JobId.parse(arguments[0]);
 
                 if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
-                    GetJobActivityGraphJSONWork gjage = new GetJobActivityGraphJSONWork(ccs, jobId);
+                    GetActivityClusterGraphJSONWork gjage = new GetActivityClusterGraphJSONWork(ccs, jobId);
                     ccs.getWorkQueue().scheduleAndSync(gjage);
                     result.put("result", gjage.getJSON());
                 } else if ("job-run".equalsIgnoreCase(arguments[1])) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 01f14f3..8a552f5 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -20,9 +20,9 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -49,10 +49,11 @@
         JobRun run = ccs.getActiveRunMap().get(jobId);
         if (run != null) {
             TaskId tid = taId.getTaskId();
-            Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+            Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterGraph().getActivityMap();
             ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
             if (ac != null) {
-                Map<ActivityId, ActivityPlan> taskStateMap = ac.getPlan().getActivityPlanMap();
+                Map<ActivityId, ActivityPlan> taskStateMap = run.getActivityClusterPlanMap().get(ac.getId())
+                        .getActivityPlanMap();
                 Task[] taskStates = taskStateMap.get(tid.getActivityId()).getTasks();
                 if (taskStates != null && taskStates.length > tid.getPartition()) {
                     Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
similarity index 87%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
rename to hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
index 677bece..1f79763 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
@@ -21,12 +21,12 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
-public class GetJobActivityGraphJSONWork extends SynchronizableWork {
+public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final JobId jobId;
     private JSONObject json;
 
-    public GetJobActivityGraphJSONWork(ClusterControllerService ccs, JobId jobId) {
+    public GetActivityClusterGraphJSONWork(ClusterControllerService ccs, JobId jobId) {
         this.ccs = ccs;
         this.jobId = jobId;
     }
@@ -41,7 +41,7 @@
                 return;
             }
         }
-        json = run.getJobActivityGraph().toJSON();
+        json = run.getActivityClusterGraph().toJSON();
     }
 
     public JSONObject getJSON() {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 5cc31f6..a0afd61 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -44,7 +44,7 @@
             JSONObject jo = new JSONObject();
             jo.put("type", "job-summary");
             jo.put("job-id", run.getJobId().toString());
-            jo.put("application-name", run.getJobActivityGraph().getApplicationName());
+            jo.put("application-name", run.getApplicationName());
             jo.put("create-time", run.getCreateTime());
             jo.put("start-time", run.getCreateTime());
             jo.put("end-time", run.getCreateTime());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 8d8e85d..6b5ff03 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -21,7 +21,7 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -69,7 +69,7 @@
                 }
             }
         } else {
-            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getApplicationName());
             if (appCtx != null) {
                 try {
                     appCtx.notifyJobFinish(jobId);
@@ -91,8 +91,8 @@
     private JSONObject createJobLogObject(final JobRun run) {
         JSONObject jobLogObject = new JSONObject();
         try {
-            JobActivityGraph jag = run.getJobActivityGraph();
-            jobLogObject.put("job-activity-graph", jag.toJSON());
+            ActivityClusterGraph acg = run.getActivityClusterGraph();
+            jobLogObject.put("activity-cluster-graph", acg.toJSON());
             jobLogObject.put("job-run", run.toJSON());
         } catch (JSONException e) {
             throw new RuntimeException(e);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index fa22691..b6a33cd 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -15,43 +15,32 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Set;
 
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private final byte[] jobSpec;
+    private final byte[] acggfBytes;
     private final EnumSet<JobFlag> jobFlags;
     private final JobId jobId;
     private final String appName;
     private final IResultCallback<JobId> callback;
 
-    public JobStartWork(ClusterControllerService ccs, String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags,
+    public JobStartWork(ClusterControllerService ccs, String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags,
             JobId jobId, IResultCallback<JobId> callback) {
         this.jobId = jobId;
         this.ccs = ccs;
-        this.jobSpec = jobSpec;
+        this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.appName = appName;
         this.callback = callback;
@@ -64,53 +53,13 @@
             if (appCtx == null) {
                 throw new HyracksException("No application with id " + appName + " found");
             }
-            JobSpecification spec = appCtx.createJobSpecification(jobSpec);
-
-            final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(appName, spec, jobFlags);
-            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
-                @Override
-                public void visit(IConnectorDescriptor conn) throws HyracksException {
-                    builder.addConnector(conn);
-                }
-            });
-            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-                @Override
-                public void visit(IOperatorDescriptor op) {
-                    op.contributeActivities(builder);
-                }
-            });
-            builder.finish();
-            final JobActivityGraph jag = builder.getActivityGraph();
-
-            JobRun run = new JobRun(jobId, jag);
-
+            IActivityClusterGraphGeneratorFactory acggf = appCtx.createActivityClusterGraphGeneratorFactory(acggfBytes);
+            IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(appName, jobId, appCtx,
+                    jobFlags);
+            JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags);
             run.setStatus(JobStatus.INITIALIZED, null);
-
             ccs.getActiveRunMap().put(jobId, run);
-            final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
-            final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
-                @Override
-                public void addConstraint(Constraint constraint) {
-                    contributedConstraints.add(constraint);
-                }
-            };
-            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-                @Override
-                public void visit(IOperatorDescriptor op) {
-                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
-                @Override
-                public void visit(IConnectorDescriptor conn) {
-                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            contributedConstraints.addAll(spec.getUserConstraints());
-
-            JobScheduler jrs = new JobScheduler(ccs, run, contributedConstraints);
-            run.setScheduler(jrs);
-            appCtx.notifyJobCreation(jobId, spec);
+            appCtx.notifyJobCreation(jobId, acggf);
             run.setStatus(JobStatus.RUNNING, null);
             try {
                 run.getScheduler().startJob();
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index dea4a8d..9927289 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -22,7 +22,7 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -59,7 +59,7 @@
             ncs.getActiveJobIds().remove(jobId);
         }
         if (cleanupPendingNodes.isEmpty()) {
-            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getApplicationName());
             if (appCtx != null) {
                 try {
                     appCtx.notifyJobFinish(jobId);
@@ -82,8 +82,8 @@
     private JSONObject createJobLogObject(final JobRun run) {
         JSONObject jobLogObject = new JSONObject();
         try {
-            JobActivityGraph jag = run.getJobActivityGraph();
-            jobLogObject.put("job-activity-graph", jag.toJSON());
+            ActivityClusterGraph acg = run.getActivityClusterGraph();
+            jobLogObject.put("activity-cluster-graph", acg.toJSON());
             jobLogObject.put("job-run", run.toJSON());
         } catch (JSONException e) {
             throw new RuntimeException(e);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2456c25..2a844ad 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import java.util.List;
-import java.util.logging.Level;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -57,11 +56,6 @@
     }
 
     @Override
-    public Level logLevel() {
-        return Level.FINEST;
-    }
-
-    @Override
     public String toString() {
         return "PartitionAvailable@" + partitionDescriptor;
     }
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 602d860..0fff257 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -15,9 +15,9 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 47f4ceb..049adf8 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.common.base;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -21,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -28,7 +30,7 @@
 
 public interface INodeController {
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index 350f527..6c208fe 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.common.controllers;
 
+import java.io.File;
 import java.util.List;
 
 import org.kohsuke.args4j.Option;
@@ -52,6 +53,9 @@
     @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
     public String ccRoot = "ClusterControllerService";
 
+    @Option(name = "-cluster-topology", usage = "Sets the XML file that defines the cluster topology. (default: null)")
+    public File clusterTopologyDefinition = null;
+
     public void toCommandLine(List<String> cList) {
         cList.add("-client-net-ip-address");
         cList.add(clientNetIpAddress);
@@ -75,5 +79,9 @@
         cList.add(String.valueOf(jobHistorySize));
         cList.add("-cc-root");
         cList.add(ccRoot);
+        if (clusterTopologyDefinition != null) {
+            cList.add("-cluster-topology");
+            cList.add(clusterTopologyDefinition.getAbsolutePath());
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e5f0efc..8f0056f 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -22,6 +22,7 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -498,15 +500,17 @@
         private final byte[] planBytes;
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+        private final EnumSet<JobFlag> flags;
 
         public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
             this.appName = appName;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
+            this.flags = flags;
         }
 
         @Override
@@ -533,6 +537,10 @@
         public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
             return connectorPolicies;
         }
+
+        public EnumSet<JobFlag> getFlags() {
+            return flags;
+        }
     }
 
     public static class AbortTasksFunction extends Function {
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8176cb7..10c0a7c 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.common.ipc;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -21,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -37,9 +39,9 @@
 
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
         CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
-                taskDescriptors, connectorPolicies);
+                taskDescriptors, connectorPolicies, flags);
         ipcHandle.send(-1, stf, null);
     }
 
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 36f0c49..26e5412 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -70,11 +70,11 @@
 
     public void schedule(AbstractWork event) {
         enqueueCount.incrementAndGet();
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Enqueue: " + enqueueCount);
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Enqueue: " + enqueueCount);
         }
-        if (LOGGER.isLoggable(event.logLevel())) {
-            LOGGER.log(event.logLevel(), "Scheduling: " + event);
+        if (LOGGER.isLoggable(Level.FINER)) {
+            LOGGER.finer("Scheduling: " + event);
         }
         queue.offer(event);
     }
@@ -105,8 +105,8 @@
                         continue;
                     }
                     dequeueCount.incrementAndGet();
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("Dequeue: " + dequeueCount + "/" + enqueueCount);
+                    if (LOGGER.isLoggable(Level.FINEST)) {
+                        LOGGER.finest("Dequeue: " + dequeueCount + "/" + enqueueCount);
                     }
                     try {
                         if (LOGGER.isLoggable(r.logLevel())) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 10c9c3d..63a45db 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -32,11 +32,11 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
 import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -60,7 +60,7 @@
 
     private final JobId jobId;
 
-    private final JobActivityGraph jag;
+    private final ActivityClusterGraph acg;
 
     private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
 
@@ -84,11 +84,12 @@
 
     private boolean cleanupPending;
 
-    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag) {
+    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
+            ActivityClusterGraph acg) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
         this.jobId = jobId;
-        this.jag = jag;
+        this.acg = acg;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
         env = new OperatorEnvironmentImpl(nodeController.getId());
         stateObjectMap = new HashMap<Object, IStateObject>();
@@ -97,7 +98,7 @@
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
         cleanupPending = false;
-        IJobletEventListenerFactory jelf = jag.getJobletEventListenerFactory();
+        IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
         if (jelf != null) {
             IJobletEventListener listener = jelf.createListener(this);
             this.jobletEventListener = listener;
@@ -105,7 +106,7 @@
         } else {
             jobletEventListener = null;
         }
-        IGlobalJobDataFactory gjdf = jag.getGlobalJobDataFactory();
+        IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
         globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
     }
 
@@ -114,8 +115,8 @@
         return jobId;
     }
 
-    public JobActivityGraph getJobActivityGraph() {
-        return jag;
+    public ActivityClusterGraph getActivityClusterGraph() {
+        return acg;
     }
 
     public IOperatorEnvironment getEnvironment() {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3cab638..ecdd839 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -377,7 +377,7 @@
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
                     queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
-                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
+                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
                     return;
                 }
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index e8c4052..8f8c032 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -21,12 +21,12 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.Task;
 
-public class AbortTasksWork extends SynchronizableWork {
+public class AbortTasksWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
 
     private final NodeControllerService ncs;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
         }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index b75a1fc..173ab92 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -23,11 +23,11 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 
-public class CleanupJobletWork extends SynchronizableWork {
+public class CleanupJobletWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
 
     private final NodeControllerService ncs;
@@ -43,7 +43,7 @@
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 159ecb0..6eb1a95 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,8 +18,6 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
@@ -31,13 +29,11 @@
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 
-public class CreateApplicationWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(CreateApplicationWork.class.getName());
-
+public class CreateApplicationWork extends AbstractWork {
     private final NodeControllerService ncs;
 
     private final String appName;
@@ -55,7 +51,7 @@
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         try {
             NCApplicationContext appCtx;
             Map<String, NCApplicationContext> applications = ncs.getApplications();
@@ -86,8 +82,7 @@
             ncs.getClusterController()
                     .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
         } catch (Exception e) {
-            LOGGER.warning("Error creating application: " + e.getMessage());
-            LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index cfe00f6..b104ce8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -15,17 +15,14 @@
 package edu.uci.ics.hyracks.control.nc.work;
 
 import java.util.Map;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 
-public class DestroyApplicationWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(DestroyApplicationWork.class.getName());
-
+public class DestroyApplicationWork extends AbstractWork {
     private final NodeControllerService ncs;
 
     private final String appName;
@@ -36,16 +33,17 @@
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         try {
             Map<String, NCApplicationContext> applications = ncs.getApplications();
             ApplicationContext appCtx = applications.remove(appName);
             if (appCtx != null) {
                 appCtx.deinitialize();
             }
+            ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName,
+                    ApplicationStatus.DEINITIALIZED);
         } catch (Exception e) {
-            LOGGER.warning("Error destroying application: " + e.getMessage());
+            throw new RuntimeException(e);
         }
-        ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.DEINITIALIZED);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 9734567..2cf43da5b 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -17,20 +17,17 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
 
-public class ReportPartitionAvailabilityWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(ReportPartitionAvailabilityWork.class.getName());
-
+public class ReportPartitionAvailabilityWork extends AbstractWork {
     private final NodeControllerService ncs;
 
     private final PartitionId pid;
@@ -44,14 +41,18 @@
     }
 
     @Override
-    protected void doRun() throws Exception {
-        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        Joblet ji = jobletMap.get(pid.getJobId());
-        if (ji != null) {
-            PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                    ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                            .getIpAddress()), networkAddress.getPort()), pid, 5));
-            ji.reportPartitionAvailability(channel);
+    public void run() {
+        try {
+            Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+            Joblet ji = jobletMap.get(pid.getJobId());
+            if (ji != null) {
+                PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
+                        ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+                                .getIpAddress()), networkAddress.getPort()), pid, 5));
+                ji.reportPartitionAvailability(channel);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 8d1b2a6..44be4b3 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -37,12 +37,13 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.Task;
@@ -53,7 +54,7 @@
 import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
 import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
 
-public class StartTasksWork extends SynchronizableWork {
+public class StartTasksWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
 
     private final NodeControllerService ncs;
@@ -62,50 +63,57 @@
 
     private final JobId jobId;
 
-    private final byte[] jagBytes;
+    private final byte[] acgBytes;
 
     private final List<TaskAttemptDescriptor> taskDescriptors;
 
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
 
-    public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] jagBytes,
+    private final EnumSet<JobFlag> flags;
+
+    public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
         this.ncs = ncs;
         this.appName = appName;
         this.jobId = jobId;
-        this.jagBytes = jagBytes;
+        this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
+        this.flags = flags;
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         try {
             Map<String, NCApplicationContext> applications = ncs.getApplications();
             NCApplicationContext appCtx = applications.get(appName);
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jagBytes == null ? null
-                    : (JobActivityGraph) appCtx.deserialize(jagBytes));
-            final JobActivityGraph jag = joblet.getJobActivityGraph();
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+                    : (ActivityClusterGraph) appCtx.deserialize(acgBytes));
+            final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
                 public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
-                    IConnectorDescriptor conn = jag.getActivityOutputMap().get(aid).get(outputIndex);
-                    return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                    ActivityCluster ac = acg.getActivityMap().get(aid);
+                    IConnectorDescriptor conn = ac.getActivityOutputMap().get(aid).get(outputIndex);
+                    return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
 
                 @Override
                 public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
-                    IConnectorDescriptor conn = jag.getActivityInputMap().get(aid).get(inputIndex);
-                    return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                    ActivityCluster ac = acg.getActivityMap().get(aid);
+                    IConnectorDescriptor conn = ac.getActivityInputMap().get(aid).get(inputIndex);
+                    return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
             };
 
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
-                IActivity han = jag.getActivityMap().get(tid.getActivityId());
+                ActivityId aid = tid.getActivityId();
+                ActivityCluster ac = acg.getActivityMap().get(aid);
+                IActivity han = ac.getActivityMap().get(aid);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
@@ -115,7 +123,7 @@
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
 
-                List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(tid.getActivityId());
+                List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -123,21 +131,21 @@
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
-                        RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                        RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
                                 recordDesc, cPolicy);
                         collectors.add(collector);
                     }
                 }
-                List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(tid.getActivityId());
+                List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
                 if (outputs != null) {
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
-                        RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                        RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
                         IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
-                                partition, taId, jag.getJobFlags());
+                                partition, taId, flags);
 
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
@@ -155,19 +163,19 @@
             }
         } catch (Exception e) {
             e.printStackTrace();
-            throw e;
+            throw new RuntimeException(e);
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag)
+    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
             throws Exception {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            if (jag == null) {
+            if (acg == null) {
                 throw new NullPointerException("JobActivityGraph was null");
             }
-            ji = new Joblet(ncs, jobId, appCtx, jag);
+            ji = new Joblet(ncs, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }
         return ji;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 61d2733..ef479e1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -21,8 +21,8 @@
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 
 public abstract class AbstractConnectorDescriptor implements IConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -59,7 +59,7 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
             ICCApplicationContext appCtx) {
         // do nothing
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 190d890..153abd3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -77,8 +77,7 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
-            ICCApplicationContext appCtx) {
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
         // do nothing
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a69fd01..7d007a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -27,8 +27,8 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
 import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
@@ -60,10 +60,10 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
             ICCApplicationContext appCtx) {
-        OperatorDescriptorId consumer = plan.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
-        OperatorDescriptorId producer = plan.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
+        OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
+        OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
         constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
                 new PartitionCountExpression(producer)));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
new file mode 100644
index 0000000..b58b1c2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class IdentityOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public IdentityOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                writer.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index 9ec08cf..dabc8a4 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -24,8 +24,6 @@
 public class ChannelSet {
     private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
 
-    private static final int MAX_OPEN_CHANNELS = 1024;
-
     private static final int INITIAL_SIZE = 16;
 
     private final MultiplexedConnection mConn;
@@ -204,8 +202,8 @@
     }
 
     private ChannelControlBlock createChannel(int idx) throws NetException {
-        if (idx >= MAX_OPEN_CHANNELS) {
-            throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
+        if (idx > MuxDemuxCommand.MAX_CHANNEL_ID) {
+            throw new NetException("Channel Id > " + MuxDemuxCommand.MAX_CHANNEL_ID + " being opened");
         }
         if (idx >= ccbArray.length) {
             expand(idx);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 2e2636b..4610170 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -5,11 +5,11 @@
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 
 class MuxDemuxCommand {
-    static final int MAX_CHANNEL_ID = 0x3ff;
+    static final int MAX_CHANNEL_ID = Integer.MAX_VALUE - 1;
 
-    static final int COMMAND_SIZE = 4;
+    static final int COMMAND_SIZE = 8;
 
-    static final int MAX_DATA_VALUE = 0x7ffff;
+    static final int MAX_DATA_VALUE = 0x1fffffff;
 
     enum CommandType {
         OPEN_CHANNEL,
@@ -57,15 +57,15 @@
     }
 
     public void write(ByteBuffer buffer) {
-        int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
-        buffer.putInt(cmd);
+        long cmd = (((long) channelId) << 32) | (((long) type.ordinal()) << 29) | (data & 0x1fffffff);
+        buffer.putLong(cmd);
     }
 
     public void read(ByteBuffer buffer) {
-        int cmd = buffer.getInt();
-        channelId = (cmd >> 22) & 0x3ff;
-        type = CommandType.values()[(cmd >> 19) & 0x7];
-        data = cmd & 0x7ffff;
+        long cmd = buffer.getLong();
+        channelId = (int) ((cmd >> 32) & 0x7fffffff);
+        type = CommandType.values()[(int) ((cmd >> 29) & 0x7)];
+        data = (int) (cmd & 0x1fffffff);
     }
 
     @Override