Added code to propagate ConnectorPolicyAssignmentPolicy from the JobSpecification

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1990 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 791f312..b5aa078 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -52,6 +52,7 @@
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+        acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index dd68747..e1b8390 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -25,6 +25,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 
 public class ActivityClusterGraph implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -45,6 +46,8 @@
 
     private IGlobalJobDataFactory globalJobDataFactory;
 
+    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
     public ActivityClusterGraph() {
         version = 0;
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
@@ -114,6 +117,14 @@
         this.globalJobDataFactory = globalJobDataFactory;
     }
 
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return connectorPolicyAssignmentPolicy;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+    }
+
     public JSONObject toJSON() throws JSONException {
         JSONObject acgj = new JSONObject();
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 6c52aac..f9ad14f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -361,6 +361,10 @@
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
         }
+        cpap = ac.getActivityClusterGraph().getConnectorPolicyAssignmentPolicy();
+        if (cpap != null) {
+            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+        }
         return new PipeliningConnectorPolicy();
     }