Refactored JobScheduler

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@506 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 7194a20..a4b9b50 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -69,11 +69,7 @@
         for (UUID jobId : affectedJobIds) {
             JobRun run = ccs.getRunMap().get(jobId);
             if (run != null) {
-                try {
-                    run.getScheduler().notifyNodeFailures(deadNodes);
-                } catch (HyracksException e) {
-                    throw new RuntimeException(e);
-                }
+                run.getScheduler().notifyNodeFailures(deadNodes);
             }
         }
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 9654417..5396e2c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -33,12 +33,8 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        try {
-            ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
-            ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, exception);
-        } catch (HyracksException e) {
-            e.printStackTrace();
-        }
+        ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
+        ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, exception);
     }
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 14f5d5a..3c19397 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -126,7 +126,9 @@
             JobActivityGraph jag = jobRun.getJobActivityGraph();
             for (ActivityId aid : ac.getActivities()) {
                 Set<ActivityId> deps = jag.getBlocked2BlockerMap().get(aid);
-                prereqs.addAll(deps);
+                if (deps != null) {
+                    prereqs.addAll(deps);
+                }
             }
         } else {
 
@@ -140,7 +142,8 @@
     }
 
     private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
-        if (frontier.contains(candidate) || inProgressClusters.contains(candidate)) {
+        if (frontier.contains(candidate) || inProgressClusters.contains(candidate)
+                || completedClusters.contains(candidate)) {
             return;
         }
         boolean depsComplete = true;
@@ -169,10 +172,7 @@
             }
         }
         if (depsComplete) {
-            if (runnable && candidate != rootActivityCluster) {
-                frontier.add(candidate);
-            }
-
+            frontier.add(candidate);
         }
     }