Fixed runnability computation. Added two materializing pipelining policies.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@516 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 0b15865..d4e6972 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -52,6 +52,6 @@
 
     @Override
     public String toString() {
-        return "TAID:[" + taskId + ":" + attempt + "]";
+        return "TAID:[" + taskId + "]:" + attempt;
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 4420279..ee63355 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -52,6 +52,6 @@
 
     @Override
     public String toString() {
-        return "TID:[" + activityId + ":" + partition + ":" + "]";
+        return "TID:[" + activityId + "]:" + partition;
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
similarity index 93%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
index ba9d3a5..39b3904 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class PipeliningConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
similarity index 90%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
index ba9d3a5..5d522c0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
@@ -14,22 +14,22 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
     public boolean requiresProducerConsumerCoscheduling() {
-        return true;
+        return false;
     }
 
     @Override
     public boolean consumerWaitsForProducerToFinish() {
-        return true;
+        return false;
     }
 
     @Override
     public boolean materializeOnSendSide() {
-        return false;
+        return true;
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
similarity index 90%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
index ba9d3a5..1cc0583 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
@@ -14,26 +14,26 @@
  */
 package edu.uci.ics.hyracks.api.dataflow.connectors;
 
-public final class PipelinedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
     private static final long serialVersionUID = 1L;
 
     @Override
     public boolean requiresProducerConsumerCoscheduling() {
-        return true;
+        return false;
     }
 
     @Override
     public boolean consumerWaitsForProducerToFinish() {
-        return true;
+        return false;
     }
 
     @Override
     public boolean materializeOnSendSide() {
-        return false;
+        return true;
     }
 
     @Override
     public boolean materializeOnReceiveSide() {
-        return false;
+        return true;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
index 3f97651..531dc92 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -17,19 +17,14 @@
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class ActivityClusterPlan {
     private final Map<ActivityId, Task[]> taskStateMap;
 
-    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
-
     private final TaskCluster[] taskClusters;
 
-    public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap,
-            Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap) {
+    public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap) {
         this.taskStateMap = taskStateMap;
-        this.partitionProducingTaskClusterMap = partitionProducingTaskClusterMap;
         this.taskClusters = taskClusters;
     }
 
@@ -40,8 +35,4 @@
     public TaskCluster[] getTaskClusters() {
         return taskClusters;
     }
-
-    public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
-        return partitionProducingTaskClusterMap;
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 7231842..57df351 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -50,8 +50,11 @@
 
     private final JobScheduler scheduler;
 
+    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
     public ActivityClusterPlanner(JobScheduler newJobScheduler) {
         this.scheduler = newJobScheduler;
+        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
     }
 
     public void planActivityCluster(ActivityCluster ac) throws HyracksException {
@@ -174,7 +177,6 @@
         }
         TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
 
-        Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
         for (TaskCluster tc : taskClusters) {
             Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
             for (Task ts : tc.getTasks()) {
@@ -211,7 +213,7 @@
             }
         }
 
-        ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap, partitionProducingTaskClusterMap));
+        ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap));
     }
 
     private TaskCluster getTaskCluster(TaskId tid) {
@@ -267,7 +269,7 @@
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
         }
-        return new PipelinedConnectorPolicy();
+        return new PipeliningConnectorPolicy();
     }
 
     private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
@@ -322,4 +324,8 @@
         }
         return activityPartsMap;
     }
+
+    public Map<? extends PartitionId, ? extends TaskCluster> getPartitionProducingTaskClusterMap() {
+        return partitionProducingTaskClusterMap;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
index d3fc871..414b543 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
@@ -43,6 +43,7 @@
 
     @Override
     public String toString() {
-        return nPartitions + ":" + Arrays.toString(nInputPartitions) + ":" + Arrays.toString(nOutputPartitions);
+        return nPartitions + ":" + (nInputPartitions == null ? "[]" : Arrays.toString(nInputPartitions)) + ":"
+                + (nOutputPartitions == null ? "[]" : Arrays.toString(nOutputPartitions));
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index a8d8ab7..356c96b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -147,7 +147,8 @@
                 Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
                 for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
                     if (tc.getProducedPartitions().isEmpty()) {
-                        if (findLastTaskClusterAttempt(tc).getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                        TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+                        if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
                             tcRootsComplete = false;
                         }
                         depACTCRoots.add(tc);
@@ -163,6 +164,7 @@
             if (!isPlanned(candidate)) {
                 ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
                 acp.planActivityCluster(candidate);
+                partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
             }
             for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
                 if (tc.getProducedPartitions().isEmpty()) {
@@ -203,7 +205,6 @@
         for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
             TaskCluster tc = e.getKey();
             Runnability runnability = e.getValue();
-            assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
             if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
                 continue;
             }
@@ -244,11 +245,17 @@
      * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} 
      */
     private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Computing runnability: " + goal);
+        }
         if (runnabilityMap.containsKey(goal)) {
             return runnabilityMap.get(goal);
         }
         TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
         if (lastAttempt != null) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Last Attempt Status: " + lastAttempt.getStatus());
+            }
             if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
                 Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
                 runnabilityMap.put(goal, runnability);
@@ -264,22 +271,47 @@
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
         for (PartitionId pid : goal.getRequiredPartitions()) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Inspecting required partition: " + pid);
+            }
             Runnability runnability;
             ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
             IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
             PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Policy: " + cPolicy + " maxState: " + maxState);
+            }
             if (PartitionState.COMMITTED.equals(maxState)) {
                 runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
             } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
                 runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
             } else {
                 runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
-                if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
-                        && cPolicy.consumerWaitsForProducerToFinish()) {
-                    runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                switch (runnability.getTag()) {
+                    case RUNNABLE:
+                        if (cPolicy.consumerWaitsForProducerToFinish()) {
+                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                        } else {
+                            runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
+                        }
+                        break;
+
+                    case NOT_RUNNABLE:
+                        break;
+
+                    case RUNNING:
+                        if (cPolicy.consumerWaitsForProducerToFinish()) {
+                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                        } else {
+                            runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+                        }
+                        break;
                 }
             }
             aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("aggregateRunnability: " + aggregateRunnability);
+            }
         }
         runnabilityMap.put(goal, aggregateRunnability);
         return aggregateRunnability;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
index f990806..56ae9c3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
@@ -37,7 +37,6 @@
         NOT_RUNNABLE,
         RUNNABLE,
         RUNNING,
-        UNSATISFIED_PREREQUISITES,
     }
 
     public static Runnability getWorstCase(Runnability r1, Runnability r2) {
@@ -48,7 +47,6 @@
                     case NOT_RUNNABLE:
                     case RUNNABLE:
                     case RUNNING:
-                    case UNSATISFIED_PREREQUISITES:
                         return r2;
                 }
                 break;
@@ -60,9 +58,6 @@
                     case RUNNABLE:
                     case RUNNING:
                         return r1;
-
-                    case UNSATISFIED_PREREQUISITES:
-                        return r2;
                 }
                 break;
 
@@ -75,7 +70,6 @@
                         return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1);
 
                     case NOT_RUNNABLE:
-                    case UNSATISFIED_PREREQUISITES:
                         return r2;
 
                     case RUNNABLE:
@@ -90,24 +84,12 @@
                         return r1;
 
                     case NOT_RUNNABLE:
-                    case UNSATISFIED_PREREQUISITES:
                         return r2;
 
                     case RUNNABLE:
                         return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1);
                 }
                 break;
-
-            case UNSATISFIED_PREREQUISITES:
-                switch (r2.tag) {
-                    case COMPLETED:
-                    case NOT_RUNNABLE:
-                    case RUNNABLE:
-                    case RUNNING:
-                    case UNSATISFIED_PREREQUISITES:
-                        return r1;
-                }
-                break;
         }
         throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2);
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
index 63b7ad3..9fc0916 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -65,7 +65,7 @@
 
     @Override
     public String toString() {
-        return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable")
+        return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable") + " "
                 + state + "]";
     }
 }
\ No newline at end of file