Added code to propagate ConnectorPolicyAssignmentPolicy from the JobSpecification
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1990 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 791f312..b5aa078 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -52,6 +52,7 @@
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+ acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index dd68747..e1b8390 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
public class ActivityClusterGraph implements Serializable {
private static final long serialVersionUID = 1L;
@@ -45,6 +46,8 @@
private IGlobalJobDataFactory globalJobDataFactory;
+ private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
public ActivityClusterGraph() {
version = 0;
activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
@@ -114,6 +117,14 @@
this.globalJobDataFactory = globalJobDataFactory;
}
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return connectorPolicyAssignmentPolicy;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+ this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+ }
+
public JSONObject toJSON() throws JSONException {
JSONObject acgj = new JSONObject();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 6c52aac..f9ad14f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -361,6 +361,10 @@
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
+ cpap = ac.getActivityClusterGraph().getConnectorPolicyAssignmentPolicy();
+ if (cpap != null) {
+ return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+ }
return new PipeliningConnectorPolicy();
}