Create set<Tuple> for startmessage
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@17 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 3c39379..e82a49f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -24,6 +24,7 @@
import jol.types.basic.Tuple;
import jol.types.basic.TupleSet;
import jol.types.table.BasicTable;
+import jol.types.table.EventTable;
import jol.types.table.Key;
import jol.types.table.TableName;
import edu.uci.ics.hyracks.api.comm.Endpoint;
@@ -83,7 +84,7 @@
this.anTable = new ActivityNodeTable(jolRuntime);
this.acTable = new ActivityConnectionTable(jolRuntime);
this.abTable = new ActivityBlockedTable(jolRuntime);
- this.jobStartTable = new JobStartTable(jolRuntime);
+ this.jobStartTable = new JobStartTable();
this.startMessageTable = new StartMessageTable(jolRuntime);
jolRuntime.catalog().register(jobTable);
@@ -434,17 +435,15 @@
/*
* declare(jobstart, keys(0), {JobId, SubmitTime})
*/
- private static class JobStartTable extends BasicTable {
+ private static class JobStartTable extends EventTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
- private static Key PRIMARY_KEY = new Key(0);
-
private static final Class[] SCHEMA = new Class[] {
UUID.class, Long.class
};
- public JobStartTable(Runtime context) {
- super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ public JobStartTable() {
+ super(TABLE_NAME, SCHEMA);
}
static Tuple createTuple(UUID jobId, long submitTime) {
@@ -458,10 +457,10 @@
private static class StartMessageTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
- private static Key PRIMARY_KEY = new Key(0, 1);
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, JobPlan.class, TupleSet.class
+ UUID.class, UUID.class, JobPlan.class, Set.class
};
public StartMessageTable(Runtime context) {
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index dd62758..d05dcb2 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -70,11 +70,9 @@
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _),
notin stagestart(JobId, _);
-/*
update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan),
jobstart(JobId, _);
-*/
stagestart_NEXT stagestart(JobId, NextStageNumber) :-
stagestart(JobId, StageNumber),
@@ -97,19 +95,19 @@
watch(activitystart, a);
-define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, ActivityNodeId});
+define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, Set});
-stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId) :-
+stageletstart(JobId, StageId, JobPlan, NodeId, set<ActivityId>) :-
activitystart(JobId, _, ActivityId, _, StageId, NodeId),
job(JobId, _, _, JobPlan);
watch(stageletstart, a);
watch(stageletstart, i);
-startmessage(JobId, StageId, JobPlan, tupleset<Tuple>) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId)
+startmessage(JobId, StageId, JobPlan, set<Tuple>) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, ActivityIdSet)
{
- Tuple := new Tuple([NodeId, ActivityId]);
+ Tuple := new Tuple([NodeId, ActivityIdSet]);
};
watch(startmessage, i);
\ No newline at end of file