JOL based scheduler at par with original scheduler (except for stats reporting)

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@18 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
index 921b239..8784b46 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
@@ -64,8 +64,8 @@
         StringBuilder buffer = new StringBuilder();
 
         buffer.append("{\n");
-        indent(buffer, 1).append("startTime: '").append(df.format(startTime)).append("',\n");
-        indent(buffer, 1).append("endTime: '").append(df.format(endTime)).append("',\n");
+        indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
+        indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
         indent(buffer, 1).append("stages: [\n");
         boolean first = true;
         for (StageStatistics ss : stages) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index dbeb301..4513ca9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -40,6 +40,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
 
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
@@ -85,7 +86,9 @@
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
         if (ccConfig.useJOL) {
-            jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_ALL, System.err);
+            Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL
+                : new HashSet<DebugLevel>();
+            jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
             jobManager = new JOLJobManagerImpl(this, jolRuntime);
         } else {
             jobManager = new JobManagerImpl(this);
@@ -415,6 +418,34 @@
         }
     }
 
+    static class StageStarter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.startStage(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Started Stage: " + stageId;
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
     static class PortMapMergingAccumulator implements
         Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
         Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index e82a49f..3709753 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -23,6 +25,7 @@
 import jol.types.basic.BasicTupleSet;
 import jol.types.basic.Tuple;
 import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
 import jol.types.table.BasicTable;
 import jol.types.table.EventTable;
 import jol.types.table.Key;
@@ -45,6 +48,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public class JOLJobManagerImpl implements IJobManager {
@@ -74,6 +78,8 @@
 
     private final StartMessageTable startMessageTable;
 
+    private final StageletCompleteTable stageletCompleteTable;
+
     public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
         this.ccs = ccs;
         this.jolRuntime = jolRuntime;
@@ -86,6 +92,7 @@
         this.abTable = new ActivityBlockedTable(jolRuntime);
         this.jobStartTable = new JobStartTable();
         this.startMessageTable = new StartMessageTable(jolRuntime);
+        this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
 
         jolRuntime.catalog().register(jobTable);
         jolRuntime.catalog().register(odTable);
@@ -96,6 +103,19 @@
         jolRuntime.catalog().register(abTable);
         jolRuntime.catalog().register(jobStartTable);
         jolRuntime.catalog().register(startMessageTable);
+        jolRuntime.catalog().register(stageletCompleteTable);
+
+        jobTable.register(new JobTable.Callback() {
+            @Override
+            public void deletion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+
+            @Override
+            public void insertion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+        });
 
         startMessageTable.register(new StartMessageTable.Callback() {
             @Override
@@ -112,11 +132,11 @@
                             UUID jobId = (UUID) data[0];
                             UUID stageId = (UUID) data[1];
                             JobPlan plan = (JobPlan) data[2];
-                            TupleSet ts = (TupleSet) data[3];
+                            Set<List> ts = (Set<List>) data[3];
                             ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
                                 .size()];
                             int i = 0;
-                            for (Tuple t2 : ts) {
+                            for (List t2 : ts) {
                                 Object[] t2Data = t2.toArray();
                                 p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
                                     plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
@@ -125,22 +145,23 @@
                                 new ClusterControllerService.PortMapMergingAccumulator());
                             ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
                                 .size()];
-                            i = 0;
-                            for (Tuple t2 : ts) {
-                                Object[] t2Data = t2.toArray();
-                                p2is[i++] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
-                                    plan, stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
-                            }
-                            ccs.runRemote(p2is, null);
                             ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
                                 .size()];
+                            ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+                                .size()];
                             i = 0;
-                            for (Tuple t2 : ts) {
+                            for (List t2 : ts) {
                                 Object[] t2Data = t2.toArray();
-                                p3is[i++] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+                                p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
+                                    stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+                                p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
                                     stageId);
+                                ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
+                                ++i;
                             }
+                            ccs.runRemote(p2is, null);
                             ccs.runRemote(p3is, null);
+                            ccs.runRemote(ss, null);
                         }
                     }
                 } catch (Exception e) {
@@ -235,7 +256,17 @@
 
     @Override
     public JobStatus getJobStatus(UUID jobId) {
-        return null;
+        synchronized (jobTable) {
+            try {
+                Tuple jobTuple = jobTable.lookupJob(jobId);
+                if (jobTuple == null) {
+                    return null;
+                }
+                return (JobStatus) jobTuple.value(1);
+            } catch (BadKeyException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     @Override
@@ -244,8 +275,14 @@
     }
 
     @Override
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-        throws Exception {
+    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+        StageletStatistics statistics) throws Exception {
+        BasicTupleSet scTuples = new BasicTupleSet();
+        scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, statistics));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+        jolRuntime.evaluate();
     }
 
     @Override
@@ -260,7 +297,13 @@
 
     @Override
     public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        return null;
+        synchronized (jobTable) {
+            Tuple jobTuple = null;
+            while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+                jobTable.wait();
+            }
+            return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+        }
     }
 
     /*
@@ -272,7 +315,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class
+            UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
         };
 
         public JobTable(Runtime context) {
@@ -280,7 +323,30 @@
         }
 
         static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
-            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan);
+            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+        }
+
+        JobStatistics buildJobStatistics(Tuple jobTuple) {
+            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+            JobStatistics stats = new JobStatistics();
+            if (statsSet != null) {
+                for (Set<StageletStatistics> stageStatsSet : statsSet) {
+                    StageStatistics stageStats = new StageStatistics();
+                    for (StageletStatistics stageletStats : stageStatsSet) {
+                        stageStats.addStageletStatistics(stageletStats);
+                    }
+                    stats.addStageStatistics(stageStats);
+                }
+            }
+            return stats;
+        }
+
+        Tuple lookupJob(UUID jobId) throws BadKeyException {
+            TupleSet set = primary().lookupByKey(jobId);
+            if (set.isEmpty()) {
+                return null;
+            }
+            return (Tuple) set.toArray()[0];
         }
     }
 
@@ -457,7 +523,7 @@
     private static class StartMessageTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
 
-        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+        private static Key PRIMARY_KEY = new Key(0, 1);
 
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, UUID.class, JobPlan.class, Set.class
@@ -467,4 +533,25 @@
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
     }
+
+    /*
+     * declare(stageletcomplete, keys(0, 1, 2), {JobId, StageId, NodeId, StageletStatistics})
+     */
+    private static class StageletCompleteTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, StageletStatistics.class
+        };
+
+        public StageletCompleteTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics) {
+            return new Tuple(jobId, stageId, nodeId, statistics);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index d05dcb2..8f62f26 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -14,42 +14,58 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 
-define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
 
-activitystage(JobId, OperatorId, ActivityId, 0) :-
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
     activitynode(JobId, OperatorId, ActivityId, _);
 
-activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
-    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
-    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
     activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
-    StageNumber1 == StageNumber2
+    StageNumber2 <= StageNumber1
     {
         StageNumber := StageNumber1 + 1;
     };
 
-activitystage(JobId, OperatorId1, ActivityId1, StageNumber) :-
-    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
-    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
-    activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
-    activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
     connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
     StageNumber1 != StageNumber2
     {
         StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
     };
 
-activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
-    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
-    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
-    activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
-    activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
-    connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId1, Operator1Port, _),
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
     StageNumber1 != StageNumber2
     {
         StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
     };
 
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+    activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
 define(jobstage, keys(0, 1), {UUID, Integer, UUID});
 
 jobstage(JobId, StageNumber, StageId) :-
@@ -61,22 +77,22 @@
 watch(jobstage, a);
 
 define(stagestart, keys(0), {UUID, Integer});
-define(stagefinish, keys(), {UUID, Integer});
+define(stagefinish, keys(0, 1), {UUID, Integer, Set});
 
 watch(jobstart, i);
 
 stagestart_INITIAL stagestart(JobId, 0) :-
     jobstart(JobId, _),
-    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
     notin stagestart(JobId, _);
 
-update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan) :-
-    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan),
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
     jobstart(JobId, _);
 
 stagestart_NEXT stagestart(JobId, NextStageNumber) :-
     stagestart(JobId, StageNumber),
-    stagefinish(StageId, StageNumber)
+    stagefinish#insert(StageId, StageNumber, _)
     {
         NextStageNumber := StageNumber + 1;
     };
@@ -95,19 +111,42 @@
 
 watch(activitystart, a);
 
-define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, Set});
+define(stageletstart, keys(0, 1, 3), {UUID, UUID, JobPlan, String, Set});
 
 stageletstart(JobId, StageId, JobPlan, NodeId, set<ActivityId>) :-
-    activitystart(JobId, _, ActivityId, _, StageId, NodeId),
-    job(JobId, _, _, JobPlan);
+    activitystart#insert(JobId, _, ActivityId, StageNumber, StageId, NodeId),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
 
 watch(stageletstart, a);
 watch(stageletstart, i);
 
-startmessage(JobId, StageId, JobPlan, set<Tuple>) :-
+define(startmessage_agg, keys(0, 1), {UUID, UUID, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, JobPlan, set<Tuple>) :-
     stageletstart(JobId, StageId, JobPlan, NodeId, ActivityIdSet)
     {
-        Tuple := new Tuple([NodeId, ActivityIdSet]);
+        Tuple := [NodeId, ActivityIdSet];
     };
 
-watch(startmessage, i);
\ No newline at end of file
+startmessage(JobId, StageId, JobPlan, TSet) :-
+    startmessage_agg(JobId, StageId, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletcomplete_agg, keys(0, 1), {UUID, UUID, Set});
+
+stageletcomplete_agg(JobId, StageId, set<Statistics>) :-
+    stageletcomplete(JobId, StageId, NodeId, Statistics);
+
+stagefinish(JobId, StageNumber, SSet) :-
+    startmessage_agg(JobId, StageId, _, TSet),
+    stageletcomplete_agg(JobId, StageId, SSet),
+    jobstage(JobId, StageNumber, StageId),
+    TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+    stagestart#insert(JobId, StageNumber),
+    stagefinish(JobId, _, SSet),
+    notin jobstage(JobId, StageNumber);
\ No newline at end of file