JOL based scheduler at par with original scheduler (except for stats reporting)
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@18 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
index 921b239..8784b46 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
@@ -64,8 +64,8 @@
StringBuilder buffer = new StringBuilder();
buffer.append("{\n");
- indent(buffer, 1).append("startTime: '").append(df.format(startTime)).append("',\n");
- indent(buffer, 1).append("endTime: '").append(df.format(endTime)).append("',\n");
+ indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
+ indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
indent(buffer, 1).append("stages: [\n");
boolean first = true;
for (StageStatistics ss : stages) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index dbeb301..4513ca9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -40,6 +40,7 @@
import javax.servlet.http.HttpServletResponse;
import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
@@ -85,7 +86,9 @@
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
if (ccConfig.useJOL) {
- jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_ALL, System.err);
+ Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL
+ : new HashSet<DebugLevel>();
+ jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
jobManager = new JOLJobManagerImpl(this, jolRuntime);
} else {
jobManager = new JobManagerImpl(this);
@@ -415,6 +418,34 @@
}
}
+ static class StageStarter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.startStage(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Started Stage: " + stageId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
static class PortMapMergingAccumulator implements
Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index e82a49f..3709753 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -23,6 +25,7 @@
import jol.types.basic.BasicTupleSet;
import jol.types.basic.Tuple;
import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
import jol.types.table.BasicTable;
import jol.types.table.EventTable;
import jol.types.table.Key;
@@ -45,6 +48,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public class JOLJobManagerImpl implements IJobManager {
@@ -74,6 +78,8 @@
private final StartMessageTable startMessageTable;
+ private final StageletCompleteTable stageletCompleteTable;
+
public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
this.ccs = ccs;
this.jolRuntime = jolRuntime;
@@ -86,6 +92,7 @@
this.abTable = new ActivityBlockedTable(jolRuntime);
this.jobStartTable = new JobStartTable();
this.startMessageTable = new StartMessageTable(jolRuntime);
+ this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
jolRuntime.catalog().register(jobTable);
jolRuntime.catalog().register(odTable);
@@ -96,6 +103,19 @@
jolRuntime.catalog().register(abTable);
jolRuntime.catalog().register(jobStartTable);
jolRuntime.catalog().register(startMessageTable);
+ jolRuntime.catalog().register(stageletCompleteTable);
+
+ jobTable.register(new JobTable.Callback() {
+ @Override
+ public void deletion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+
+ @Override
+ public void insertion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+ });
startMessageTable.register(new StartMessageTable.Callback() {
@Override
@@ -112,11 +132,11 @@
UUID jobId = (UUID) data[0];
UUID stageId = (UUID) data[1];
JobPlan plan = (JobPlan) data[2];
- TupleSet ts = (TupleSet) data[3];
+ Set<List> ts = (Set<List>) data[3];
ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
.size()];
int i = 0;
- for (Tuple t2 : ts) {
+ for (List t2 : ts) {
Object[] t2Data = t2.toArray();
p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
@@ -125,22 +145,23 @@
new ClusterControllerService.PortMapMergingAccumulator());
ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
.size()];
- i = 0;
- for (Tuple t2 : ts) {
- Object[] t2Data = t2.toArray();
- p2is[i++] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
- plan, stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
- }
- ccs.runRemote(p2is, null);
ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
.size()];
+ ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+ .size()];
i = 0;
- for (Tuple t2 : ts) {
+ for (List t2 : ts) {
Object[] t2Data = t2.toArray();
- p3is[i++] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+ p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
+ stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+ p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
stageId);
+ ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
+ ++i;
}
+ ccs.runRemote(p2is, null);
ccs.runRemote(p3is, null);
+ ccs.runRemote(ss, null);
}
}
} catch (Exception e) {
@@ -235,7 +256,17 @@
@Override
public JobStatus getJobStatus(UUID jobId) {
- return null;
+ synchronized (jobTable) {
+ try {
+ Tuple jobTuple = jobTable.lookupJob(jobId);
+ if (jobTuple == null) {
+ return null;
+ }
+ return (JobStatus) jobTuple.value(1);
+ } catch (BadKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -244,8 +275,14 @@
}
@Override
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception {
+ public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ BasicTupleSet scTuples = new BasicTupleSet();
+ scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, statistics));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+ jolRuntime.evaluate();
}
@Override
@@ -260,7 +297,13 @@
@Override
public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- return null;
+ synchronized (jobTable) {
+ Tuple jobTuple = null;
+ while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+ jobTable.wait();
+ }
+ return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+ }
}
/*
@@ -272,7 +315,7 @@
private static Key PRIMARY_KEY = new Key(0);
private static final Class[] SCHEMA = new Class[] {
- UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class
+ UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
};
public JobTable(Runtime context) {
@@ -280,7 +323,30 @@
}
static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
- return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan);
+ return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+ }
+
+ JobStatistics buildJobStatistics(Tuple jobTuple) {
+ Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+ JobStatistics stats = new JobStatistics();
+ if (statsSet != null) {
+ for (Set<StageletStatistics> stageStatsSet : statsSet) {
+ StageStatistics stageStats = new StageStatistics();
+ for (StageletStatistics stageletStats : stageStatsSet) {
+ stageStats.addStageletStatistics(stageletStats);
+ }
+ stats.addStageStatistics(stageStats);
+ }
+ }
+ return stats;
+ }
+
+ Tuple lookupJob(UUID jobId) throws BadKeyException {
+ TupleSet set = primary().lookupByKey(jobId);
+ if (set.isEmpty()) {
+ return null;
+ }
+ return (Tuple) set.toArray()[0];
}
}
@@ -457,7 +523,7 @@
private static class StartMessageTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
- private static Key PRIMARY_KEY = new Key(0, 1, 2);
+ private static Key PRIMARY_KEY = new Key(0, 1);
private static final Class[] SCHEMA = new Class[] {
UUID.class, UUID.class, JobPlan.class, Set.class
@@ -467,4 +533,25 @@
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
}
+
+ /*
+ * declare(stageletcomplete, keys(0, 1, 2), {JobId, StageId, NodeId, StageletStatistics})
+ */
+ private static class StageletCompleteTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, StageletStatistics.class
+ };
+
+ public StageletCompleteTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics) {
+ return new Tuple(jobId, stageId, nodeId, statistics);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index d05dcb2..8f62f26 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -14,42 +14,58 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.JobPlan;
-define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
-activitystage(JobId, OperatorId, ActivityId, 0) :-
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
activitynode(JobId, OperatorId, ActivityId, _);
-activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
- activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
- activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
- StageNumber1 == StageNumber2
+ StageNumber2 <= StageNumber1
{
StageNumber := StageNumber1 + 1;
};
-activitystage(JobId, OperatorId1, ActivityId1, StageNumber) :-
- activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
- activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
- activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
- activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
StageNumber1 != StageNumber2
{
StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
};
-activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
- activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
- activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
- activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
- activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
- connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId1, Operator1Port, _),
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
StageNumber1 != StageNumber2
{
StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
};
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+ activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
define(jobstage, keys(0, 1), {UUID, Integer, UUID});
jobstage(JobId, StageNumber, StageId) :-
@@ -61,22 +77,22 @@
watch(jobstage, a);
define(stagestart, keys(0), {UUID, Integer});
-define(stagefinish, keys(), {UUID, Integer});
+define(stagefinish, keys(0, 1), {UUID, Integer, Set});
watch(jobstart, i);
stagestart_INITIAL stagestart(JobId, 0) :-
jobstart(JobId, _),
- job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
notin stagestart(JobId, _);
-update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan) :-
- job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan),
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
jobstart(JobId, _);
stagestart_NEXT stagestart(JobId, NextStageNumber) :-
stagestart(JobId, StageNumber),
- stagefinish(StageId, StageNumber)
+ stagefinish#insert(StageId, StageNumber, _)
{
NextStageNumber := StageNumber + 1;
};
@@ -95,19 +111,42 @@
watch(activitystart, a);
-define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, Set});
+define(stageletstart, keys(0, 1, 3), {UUID, UUID, JobPlan, String, Set});
stageletstart(JobId, StageId, JobPlan, NodeId, set<ActivityId>) :-
- activitystart(JobId, _, ActivityId, _, StageId, NodeId),
- job(JobId, _, _, JobPlan);
+ activitystart#insert(JobId, _, ActivityId, StageNumber, StageId, NodeId),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
watch(stageletstart, a);
watch(stageletstart, i);
-startmessage(JobId, StageId, JobPlan, set<Tuple>) :-
+define(startmessage_agg, keys(0, 1), {UUID, UUID, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, JobPlan, set<Tuple>) :-
stageletstart(JobId, StageId, JobPlan, NodeId, ActivityIdSet)
{
- Tuple := new Tuple([NodeId, ActivityIdSet]);
+ Tuple := [NodeId, ActivityIdSet];
};
-watch(startmessage, i);
\ No newline at end of file
+startmessage(JobId, StageId, JobPlan, TSet) :-
+ startmessage_agg(JobId, StageId, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletcomplete_agg, keys(0, 1), {UUID, UUID, Set});
+
+stageletcomplete_agg(JobId, StageId, set<Statistics>) :-
+ stageletcomplete(JobId, StageId, NodeId, Statistics);
+
+stagefinish(JobId, StageNumber, SSet) :-
+ startmessage_agg(JobId, StageId, _, TSet),
+ stageletcomplete_agg(JobId, StageId, SSet),
+ jobstage(JobId, StageNumber, StageId),
+ TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+ stagestart#insert(JobId, StageNumber),
+ stagefinish(JobId, _, SSet),
+ notin jobstage(JobId, StageNumber);
\ No newline at end of file