Fixed runnability computation. Added two materializing pipelining policies.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@516 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 0b15865..d4e6972 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -52,6 +52,6 @@
@Override
public String toString() {
- return "TAID:[" + taskId + ":" + attempt + "]";
+ return "TAID:[" + taskId + "]:" + attempt;
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 4420279..ee63355 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -52,6 +52,6 @@
@Override
public String toString() {
- return "TID:[" + activityId + ":" + partition + ":" + "]";
+ return "TID:[" + activityId + "]:" + partition;
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
similarity index 93%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
index ba9d3a5..39b3904 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class PipeliningConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
similarity index 90%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
index ba9d3a5..5d522c0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
@@ -14,22 +14,22 @@
*/
package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
public boolean requiresProducerConsumerCoscheduling() {
- return true;
+ return false;
}
@Override
public boolean consumerWaitsForProducerToFinish() {
- return true;
+ return false;
}
@Override
public boolean materializeOnSendSide() {
- return false;
+ return true;
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
similarity index 90%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
index ba9d3a5..1cc0583 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
@@ -14,26 +14,26 @@
*/
package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
public boolean requiresProducerConsumerCoscheduling() {
- return true;
+ return false;
}
@Override
public boolean consumerWaitsForProducerToFinish() {
- return true;
+ return false;
}
@Override
public boolean materializeOnSendSide() {
- return false;
+ return true;
}
@Override
public boolean materializeOnReceiveSide() {
- return false;
+ return true;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
index 3f97651..531dc92 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -17,19 +17,14 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class ActivityClusterPlan {
private final Map<ActivityId, Task[]> taskStateMap;
- private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
-
private final TaskCluster[] taskClusters;
- public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap,
- Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap) {
+ public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap) {
this.taskStateMap = taskStateMap;
- this.partitionProducingTaskClusterMap = partitionProducingTaskClusterMap;
this.taskClusters = taskClusters;
}
@@ -40,8 +35,4 @@
public TaskCluster[] getTaskClusters() {
return taskClusters;
}
-
- public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
- return partitionProducingTaskClusterMap;
- }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 7231842..57df351 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -50,8 +50,11 @@
private final JobScheduler scheduler;
+ private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
public ActivityClusterPlanner(JobScheduler newJobScheduler) {
this.scheduler = newJobScheduler;
+ partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
}
public void planActivityCluster(ActivityCluster ac) throws HyracksException {
@@ -174,7 +177,6 @@
}
TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
- Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
for (TaskCluster tc : taskClusters) {
Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
for (Task ts : tc.getTasks()) {
@@ -211,7 +213,7 @@
}
}
- ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap, partitionProducingTaskClusterMap));
+ ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap));
}
private TaskCluster getTaskCluster(TaskId tid) {
@@ -267,7 +269,7 @@
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
- return new PipelinedConnectorPolicy();
+ return new PipeliningConnectorPolicy();
}
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
@@ -322,4 +324,8 @@
}
return activityPartsMap;
}
+
+ public Map<? extends PartitionId, ? extends TaskCluster> getPartitionProducingTaskClusterMap() {
+ return partitionProducingTaskClusterMap;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
index d3fc871..414b543 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
@@ -43,6 +43,7 @@
@Override
public String toString() {
- return nPartitions + ":" + Arrays.toString(nInputPartitions) + ":" + Arrays.toString(nOutputPartitions);
+ return nPartitions + ":" + (nInputPartitions == null ? "[]" : Arrays.toString(nInputPartitions)) + ":"
+ + (nOutputPartitions == null ? "[]" : Arrays.toString(nOutputPartitions));
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index a8d8ab7..356c96b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -147,7 +147,8 @@
Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
if (tc.getProducedPartitions().isEmpty()) {
- if (findLastTaskClusterAttempt(tc).getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+ TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+ if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
tcRootsComplete = false;
}
depACTCRoots.add(tc);
@@ -163,6 +164,7 @@
if (!isPlanned(candidate)) {
ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
acp.planActivityCluster(candidate);
+ partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
}
for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
if (tc.getProducedPartitions().isEmpty()) {
@@ -203,7 +205,6 @@
for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
TaskCluster tc = e.getKey();
Runnability runnability = e.getValue();
- assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
continue;
}
@@ -244,11 +245,17 @@
* Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
*/
private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Computing runnability: " + goal);
+ }
if (runnabilityMap.containsKey(goal)) {
return runnabilityMap.get(goal);
}
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
if (lastAttempt != null) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Last Attempt Status: " + lastAttempt.getStatus());
+ }
if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
runnabilityMap.put(goal, runnability);
@@ -264,22 +271,47 @@
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
for (PartitionId pid : goal.getRequiredPartitions()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inspecting required partition: " + pid);
+ }
Runnability runnability;
ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
PartitionState maxState = pmm.getMaximumAvailableState(pid);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Policy: " + cPolicy + " maxState: " + maxState);
+ }
if (PartitionState.COMMITTED.equals(maxState)) {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
} else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
} else {
runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
- if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
- && cPolicy.consumerWaitsForProducerToFinish()) {
- runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+ switch (runnability.getTag()) {
+ case RUNNABLE:
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
+ runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+ } else {
+ runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
+ }
+ break;
+
+ case NOT_RUNNABLE:
+ break;
+
+ case RUNNING:
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
+ runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+ } else {
+ runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+ }
+ break;
}
}
aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("aggregateRunnability: " + aggregateRunnability);
+ }
}
runnabilityMap.put(goal, aggregateRunnability);
return aggregateRunnability;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
index f990806..56ae9c3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
@@ -37,7 +37,6 @@
NOT_RUNNABLE,
RUNNABLE,
RUNNING,
- UNSATISFIED_PREREQUISITES,
}
public static Runnability getWorstCase(Runnability r1, Runnability r2) {
@@ -48,7 +47,6 @@
case NOT_RUNNABLE:
case RUNNABLE:
case RUNNING:
- case UNSATISFIED_PREREQUISITES:
return r2;
}
break;
@@ -60,9 +58,6 @@
case RUNNABLE:
case RUNNING:
return r1;
-
- case UNSATISFIED_PREREQUISITES:
- return r2;
}
break;
@@ -75,7 +70,6 @@
return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1);
case NOT_RUNNABLE:
- case UNSATISFIED_PREREQUISITES:
return r2;
case RUNNABLE:
@@ -90,24 +84,12 @@
return r1;
case NOT_RUNNABLE:
- case UNSATISFIED_PREREQUISITES:
return r2;
case RUNNABLE:
return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1);
}
break;
-
- case UNSATISFIED_PREREQUISITES:
- switch (r2.tag) {
- case COMPLETED:
- case NOT_RUNNABLE:
- case RUNNABLE:
- case RUNNING:
- case UNSATISFIED_PREREQUISITES:
- return r1;
- }
- break;
}
throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2);
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
index 63b7ad3..9fc0916 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -65,7 +65,7 @@
@Override
public String toString() {
- return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable")
+ return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable") + " "
+ state + "]";
}
}
\ No newline at end of file