Fixed deadlock. Fault tolerance works for all cases.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@22 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 2a3b2b7..4335b87 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -154,86 +154,82 @@
@Override
public void insertion(TupleSet tuples) {
try {
- synchronized (JOLJobManagerImpl.this) {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- UUID stageId = (UUID) data[1];
- Integer attempt = (Integer) data[2];
- JobPlan plan = (JobPlan) data[3];
- Set<List> ts = (Set<List>) data[4];
- Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
- if (opParts == null) {
- opParts = new HashSet<Integer>();
- opPartitions.put(aid.getOperatorDescriptorId(), opParts);
- }
- opParts.add((Integer) lData[1]);
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ JobPlan plan = (JobPlan) data[3];
+ Set<List> ts = (Set<List>) data[4];
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+ if (opParts == null) {
+ opParts = new HashSet<Integer>();
+ opPartitions.put(aid.getOperatorDescriptorId(), opParts);
}
+ opParts.add((Integer) lData[1]);
}
- ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
- .size()];
- int i = 0;
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> aParts = tasks.get(aid);
- if (aParts == null) {
- aParts = new HashSet<Integer>();
- tasks.put(aid, aParts);
- }
- aParts.add((Integer) lData[1]);
- }
- p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
- plan, stageId, attempt, tasks, opPartitions);
- }
- Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
- new ClusterControllerService.PortMapMergingAccumulator());
- ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
- .size()];
- ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
- .size()];
- ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
- .size()];
- i = 0;
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> aParts = tasks.get(aid);
- if (aParts == null) {
- aParts = new HashSet<Integer>();
- tasks.put(aid, aParts);
- }
- aParts.add((Integer) lData[1]);
- }
- p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
- stageId, tasks, opPartitions, globalPortMap);
- p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
- stageId);
- ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
- ++i;
- }
- ccs.runRemote(p2is, null);
- ccs.runRemote(p3is, null);
- ccs.runRemote(ss, null);
}
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+ .size()];
+ int i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId, plan,
+ stageId, attempt, tasks, opPartitions);
+ }
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+ .size()];
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+ .size()];
+ ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+ .size()];
+ i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
+ stageId, tasks, opPartitions, globalPortMap);
+ p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId, stageId);
+ ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
+ ++i;
+ }
+ ccs.runRemote(p2is, null);
+ ccs.runRemote(p3is, null);
+ ccs.runRemote(ss, null);
}
} catch (Exception e) {
- e.printStackTrace();
}
}
});
@@ -247,24 +243,21 @@
@Override
public void insertion(TupleSet tuples) {
try {
- synchronized (JOLJobManagerImpl.this) {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- Set<String> ts = (Set<String>) data[1];
- ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
- .size()];
- int i = 0;
- for (String n : ts) {
- jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
- }
- ccs.runRemote(jcns, null);
- BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
- jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ Set<String> ts = (Set<String>) data[1];
+ ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+ .size()];
+ int i = 0;
+ for (String n : ts) {
+ jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
}
+ ccs.runRemote(jcns, null);
+ BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
+ jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
}
} catch (Exception e) {
- e.printStackTrace();
}
}
});
@@ -279,30 +272,27 @@
@Override
public void insertion(TupleSet tuples) {
try {
- synchronized (JOLJobManagerImpl.this) {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- UUID stageId = (UUID) data[1];
- Integer attempt = (Integer) data[2];
- Set<List> ts = (Set<List>) data[4];
- ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
- .size()];
- int i = 0;
- BasicTupleSet notificationTuples = new BasicTupleSet();
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- String nodeId = (String) t2Data[0];
- jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
- notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
- }
- ccs.runRemote(jas, null);
-
- jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ Set<List> ts = (Set<List>) data[4];
+ ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+ .size()];
+ int i = 0;
+ BasicTupleSet notificationTuples = new BasicTupleSet();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ String nodeId = (String) t2Data[0];
+ jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
+ notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
}
+ ccs.runRemote(jas, null);
+
+ jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
}
} catch (Exception e) {
- e.printStackTrace();
}
}
});
@@ -467,6 +457,12 @@
jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
jolRuntime.evaluate();
+
+ BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+ jolRuntime.evaluate();
}
@Override
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index b120ba88..3813143 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -170,7 +170,7 @@
define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+ stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
availablenodes(NodeId),
ActivityInfoSet.size() != 0
{