Added flag to decide task cluster scheduling
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2059 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index b5aa078..f36b7b3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -53,6 +53,7 @@
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+ acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index e1b8390..32c93cd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -48,6 +48,8 @@
private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+ private boolean useConnectorPolicyForScheduling;
+
public ActivityClusterGraph() {
version = 0;
activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
@@ -125,6 +127,14 @@
this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
}
+ public boolean isUseConnectorPolicyForScheduling() {
+ return useConnectorPolicyForScheduling;
+ }
+
+ public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+ this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+ }
+
public JSONObject toJSON() throws JSONException {
JSONObject acgj = new JSONObject();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index cf2eec2..7c523f1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -64,6 +64,8 @@
private IGlobalJobDataFactory globalJobDataFactory;
+ private boolean useConnectorPolicyForScheduling;
+
private transient int operatorIdCounter;
private transient int connectorIdCounter;
@@ -81,6 +83,7 @@
connectorIdCounter = 0;
frameSize = 32768;
maxReattempts = 2;
+ useConnectorPolicyForScheduling = true;
}
@Override
@@ -253,6 +256,14 @@
this.globalJobDataFactory = globalJobDataFactory;
}
+ public boolean isUseConnectorPolicyForScheduling() {
+ return useConnectorPolicyForScheduling;
+ }
+
+ public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+ this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index f9ad14f..8879627 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,8 +49,6 @@
import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
public class ActivityClusterPlanner {
- private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
-
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
private final JobScheduler scheduler;
@@ -124,7 +122,7 @@
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
activityPlanMap, activities);
- TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+ TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling() ? buildConnectorPolicyAwareTaskClusters(
ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
for (TaskCluster tc : taskClusters) {