Fixed deadlock. Fault tolerance works for all cases.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@22 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 2a3b2b7..4335b87 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -154,86 +154,82 @@
             @Override
             public void insertion(TupleSet tuples) {
                 try {
-                    synchronized (JOLJobManagerImpl.this) {
-                        for (Tuple t : tuples) {
-                            Object[] data = t.toArray();
-                            UUID jobId = (UUID) data[0];
-                            UUID stageId = (UUID) data[1];
-                            Integer attempt = (Integer) data[2];
-                            JobPlan plan = (JobPlan) data[3];
-                            Set<List> ts = (Set<List>) data[4];
-                            Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
-                            for (List t2 : ts) {
-                                Object[] t2Data = t2.toArray();
-                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                                for (List l : activityInfoSet) {
-                                    Object[] lData = l.toArray();
-                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                    Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
-                                    if (opParts == null) {
-                                        opParts = new HashSet<Integer>();
-                                        opPartitions.put(aid.getOperatorDescriptorId(), opParts);
-                                    }
-                                    opParts.add((Integer) lData[1]);
+                    for (Tuple t : tuples) {
+                        Object[] data = t.toArray();
+                        UUID jobId = (UUID) data[0];
+                        UUID stageId = (UUID) data[1];
+                        Integer attempt = (Integer) data[2];
+                        JobPlan plan = (JobPlan) data[3];
+                        Set<List> ts = (Set<List>) data[4];
+                        Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+                        for (List t2 : ts) {
+                            Object[] t2Data = t2.toArray();
+                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                            for (List l : activityInfoSet) {
+                                Object[] lData = l.toArray();
+                                ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+                                if (opParts == null) {
+                                    opParts = new HashSet<Integer>();
+                                    opPartitions.put(aid.getOperatorDescriptorId(), opParts);
                                 }
+                                opParts.add((Integer) lData[1]);
                             }
-                            ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
-                                .size()];
-                            int i = 0;
-                            for (List t2 : ts) {
-                                Object[] t2Data = t2.toArray();
-                                Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
-                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                                for (List l : activityInfoSet) {
-                                    Object[] lData = l.toArray();
-                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                    Set<Integer> aParts = tasks.get(aid);
-                                    if (aParts == null) {
-                                        aParts = new HashSet<Integer>();
-                                        tasks.put(aid, aParts);
-                                    }
-                                    aParts.add((Integer) lData[1]);
-                                }
-                                p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
-                                    plan, stageId, attempt, tasks, opPartitions);
-                            }
-                            Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
-                                new ClusterControllerService.PortMapMergingAccumulator());
-                            ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
-                                .size()];
-                            ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
-                                .size()];
-                            ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
-                                .size()];
-                            i = 0;
-                            for (List t2 : ts) {
-                                Object[] t2Data = t2.toArray();
-                                Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
-                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                                for (List l : activityInfoSet) {
-                                    Object[] lData = l.toArray();
-                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                    Set<Integer> aParts = tasks.get(aid);
-                                    if (aParts == null) {
-                                        aParts = new HashSet<Integer>();
-                                        tasks.put(aid, aParts);
-                                    }
-                                    aParts.add((Integer) lData[1]);
-                                }
-                                p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
-                                    stageId, tasks, opPartitions, globalPortMap);
-                                p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
-                                    stageId);
-                                ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
-                                ++i;
-                            }
-                            ccs.runRemote(p2is, null);
-                            ccs.runRemote(p3is, null);
-                            ccs.runRemote(ss, null);
                         }
+                        ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+                            .size()];
+                        int i = 0;
+                        for (List t2 : ts) {
+                            Object[] t2Data = t2.toArray();
+                            Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                            for (List l : activityInfoSet) {
+                                Object[] lData = l.toArray();
+                                ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                Set<Integer> aParts = tasks.get(aid);
+                                if (aParts == null) {
+                                    aParts = new HashSet<Integer>();
+                                    tasks.put(aid, aParts);
+                                }
+                                aParts.add((Integer) lData[1]);
+                            }
+                            p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId, plan,
+                                stageId, attempt, tasks, opPartitions);
+                        }
+                        Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+                            new ClusterControllerService.PortMapMergingAccumulator());
+                        ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+                            .size()];
+                        ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+                            .size()];
+                        ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+                            .size()];
+                        i = 0;
+                        for (List t2 : ts) {
+                            Object[] t2Data = t2.toArray();
+                            Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                            for (List l : activityInfoSet) {
+                                Object[] lData = l.toArray();
+                                ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                Set<Integer> aParts = tasks.get(aid);
+                                if (aParts == null) {
+                                    aParts = new HashSet<Integer>();
+                                    tasks.put(aid, aParts);
+                                }
+                                aParts.add((Integer) lData[1]);
+                            }
+                            p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
+                                stageId, tasks, opPartitions, globalPortMap);
+                            p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId, stageId);
+                            ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
+                            ++i;
+                        }
+                        ccs.runRemote(p2is, null);
+                        ccs.runRemote(p3is, null);
+                        ccs.runRemote(ss, null);
                     }
                 } catch (Exception e) {
-                    e.printStackTrace();
                 }
             }
         });
@@ -247,24 +243,21 @@
             @Override
             public void insertion(TupleSet tuples) {
                 try {
-                    synchronized (JOLJobManagerImpl.this) {
-                        for (Tuple t : tuples) {
-                            Object[] data = t.toArray();
-                            UUID jobId = (UUID) data[0];
-                            Set<String> ts = (Set<String>) data[1];
-                            ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
-                                .size()];
-                            int i = 0;
-                            for (String n : ts) {
-                                jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
-                            }
-                            ccs.runRemote(jcns, null);
-                            BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
-                            jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+                    for (Tuple t : tuples) {
+                        Object[] data = t.toArray();
+                        UUID jobId = (UUID) data[0];
+                        Set<String> ts = (Set<String>) data[1];
+                        ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+                            .size()];
+                        int i = 0;
+                        for (String n : ts) {
+                            jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
                         }
+                        ccs.runRemote(jcns, null);
+                        BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
+                        jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
                     }
                 } catch (Exception e) {
-                    e.printStackTrace();
                 }
             }
         });
@@ -279,30 +272,27 @@
             @Override
             public void insertion(TupleSet tuples) {
                 try {
-                    synchronized (JOLJobManagerImpl.this) {
-                        for (Tuple t : tuples) {
-                            Object[] data = t.toArray();
-                            UUID jobId = (UUID) data[0];
-                            UUID stageId = (UUID) data[1];
-                            Integer attempt = (Integer) data[2];
-                            Set<List> ts = (Set<List>) data[4];
-                            ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
-                                .size()];
-                            int i = 0;
-                            BasicTupleSet notificationTuples = new BasicTupleSet();
-                            for (List t2 : ts) {
-                                Object[] t2Data = t2.toArray();
-                                String nodeId = (String) t2Data[0];
-                                jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
-                                notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
-                            }
-                            ccs.runRemote(jas, null);
-
-                            jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
+                    for (Tuple t : tuples) {
+                        Object[] data = t.toArray();
+                        UUID jobId = (UUID) data[0];
+                        UUID stageId = (UUID) data[1];
+                        Integer attempt = (Integer) data[2];
+                        Set<List> ts = (Set<List>) data[4];
+                        ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+                            .size()];
+                        int i = 0;
+                        BasicTupleSet notificationTuples = new BasicTupleSet();
+                        for (List t2 : ts) {
+                            Object[] t2Data = t2.toArray();
+                            String nodeId = (String) t2Data[0];
+                            jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
+                            notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
                         }
+                        ccs.runRemote(jas, null);
+
+                        jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
                     }
                 } catch (Exception e) {
-                    e.printStackTrace();
                 }
             }
         });
@@ -467,6 +457,12 @@
         jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
 
         jolRuntime.evaluate();
+
+        BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+        jolRuntime.evaluate();
     }
 
     @Override
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index b120ba88..3813143 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -170,7 +170,7 @@
 define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
 
 startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
-    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+    stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
     availablenodes(NodeId),
     ActivityInfoSet.size() != 0
     {