Added flag to decide task cluster scheduling

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2059 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index b5aa078..f36b7b3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -53,6 +53,7 @@
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+        acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index e1b8390..32c93cd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -48,6 +48,8 @@
 
     private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
 
+    private boolean useConnectorPolicyForScheduling;
+
     public ActivityClusterGraph() {
         version = 0;
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
@@ -125,6 +127,14 @@
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public boolean isUseConnectorPolicyForScheduling() {
+        return useConnectorPolicyForScheduling;
+    }
+
+    public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+        this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+    }
+
     public JSONObject toJSON() throws JSONException {
         JSONObject acgj = new JSONObject();
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index cf2eec2..7c523f1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -64,6 +64,8 @@
 
     private IGlobalJobDataFactory globalJobDataFactory;
 
+    private boolean useConnectorPolicyForScheduling;
+
     private transient int operatorIdCounter;
 
     private transient int connectorIdCounter;
@@ -81,6 +83,7 @@
         connectorIdCounter = 0;
         frameSize = 32768;
         maxReattempts = 2;
+        useConnectorPolicyForScheduling = true;
     }
 
     @Override
@@ -253,6 +256,14 @@
         this.globalJobDataFactory = globalJobDataFactory;
     }
 
+    public boolean isUseConnectorPolicyForScheduling() {
+        return useConnectorPolicyForScheduling;
+    }
+
+    public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+        this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index f9ad14f..8879627 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,8 +49,6 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
 
 public class ActivityClusterPlanner {
-    private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
-
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
 
     private final JobScheduler scheduler;
@@ -124,7 +122,7 @@
         Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
                 activityPlanMap, activities);
 
-        TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+        TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling() ? buildConnectorPolicyAwareTaskClusters(
                 ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
 
         for (TaskCluster tc : taskClusters) {