Added JOL code for starting stagelets

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@16 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index c271356..e7152f9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -16,28 +16,29 @@
 
 import java.rmi.Remote;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStage;
 import edu.uci.ics.hyracks.config.NCConfig;
 
 public interface INodeController extends Remote {
     public String getId() throws Exception;
-    
+
     public NCConfig getConfiguration() throws Exception;
 
     public NodeCapability getNodeCapability() throws Exception;
 
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
-            throws Exception;
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+        Set<ActivityNodeId> activities) throws Exception;
 
-    public void initializeJobletPhase2(UUID jobId, JobPlan plan, JobStage stage,
-            Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+    public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+        Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
 
-    public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception;
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
 
     public void cleanUpJob(UUID jobId) throws Exception;
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 9b2fc26..dbeb301 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -22,13 +22,16 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
+import java.util.Vector;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -43,10 +46,14 @@
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.server.handler.ContextHandler;
 
+import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.controller.IClusterController;
 import edu.uci.ics.hyracks.api.controller.INodeController;
 import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
@@ -78,8 +85,8 @@
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
         if (ccConfig.useJOL) {
-            jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_WATCH, System.err);
-            jobManager = new JOLJobManagerImpl(jolRuntime);
+            jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_ALL, System.err);
+            jobManager = new JOLJobManagerImpl(this, jolRuntime);
         } else {
             jobManager = new JobManagerImpl(this);
         }
@@ -267,4 +274,159 @@
             }
         }
     }
+
+    interface RemoteOp<T> {
+        public String getNodeId();
+
+        public T execute(INodeController node) throws Exception;
+    }
+
+    interface Accumulator<T, R> {
+        public void accumulate(T o);
+
+        public R getResult();
+    }
+
+    <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+        final Semaphore installComplete = new Semaphore(remoteOps.length);
+        final List<Exception> errors = new Vector<Exception>();
+        for (final RemoteOp<T> remoteOp : remoteOps) {
+            final INodeController node = lookupNode(remoteOp.getNodeId()).getNodeController();
+
+            installComplete.acquire();
+            Runnable remoteRunner = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        T t = remoteOp.execute(node);
+                        if (accumulator != null) {
+                            synchronized (accumulator) {
+                                accumulator.accumulate(t);
+                            }
+                        }
+                    } catch (Exception e) {
+                        errors.add(e);
+                    } finally {
+                        installComplete.release();
+                    }
+                }
+            };
+
+            getExecutor().execute(remoteRunner);
+        }
+        installComplete.acquire(remoteOps.length);
+        if (!errors.isEmpty()) {
+            throw errors.get(0);
+        }
+        return accumulator == null ? null : accumulator.getResult();
+    }
+
+    static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private Set<ActivityNodeId> tasks;
+
+        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+            return node.initializeJobletPhase1(jobId, plan, stageId, tasks);
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 1";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase2Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private Set<ActivityNodeId> tasks;
+        private Map<PortInstanceId, Endpoint> globalPortMap;
+
+        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks,
+            Map<PortInstanceId, Endpoint> globalPortMap) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.tasks = tasks;
+            this.globalPortMap = globalPortMap;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.initializeJobletPhase2(jobId, plan, stageId, tasks, globalPortMap);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 2";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase3Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.commitJobletInitialization(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 3";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class PortMapMergingAccumulator implements
+        Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+        Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+        @Override
+        public void accumulate(Map<PortInstanceId, Endpoint> o) {
+            portMap.putAll(o);
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> getResult() {
+            return portMap;
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 6e7cf74..3c39379 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -16,14 +16,20 @@
 
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import jol.core.Runtime;
 import jol.types.basic.BasicTupleSet;
 import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
 import jol.types.table.BasicTable;
 import jol.types.table.Key;
 import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.Direction;
@@ -32,7 +38,9 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
@@ -43,12 +51,16 @@
 
     private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
 
+    private final ClusterControllerService ccs;
+
     private final Runtime jolRuntime;
 
     private final JobTable jobTable;
 
     private final OperatorDescriptorTable odTable;
 
+    private final OperatorLocationTable olTable;
+
     private final ConnectorDescriptorTable cdTable;
 
     private final ActivityNodeTable anTable;
@@ -57,21 +69,84 @@
 
     private final ActivityBlockedTable abTable;
 
-    public JOLJobManagerImpl(Runtime jolRuntime) throws Exception {
+    private final JobStartTable jobStartTable;
+
+    private final StartMessageTable startMessageTable;
+
+    public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
+        this.ccs = ccs;
         this.jolRuntime = jolRuntime;
         this.jobTable = new JobTable(jolRuntime);
         this.odTable = new OperatorDescriptorTable(jolRuntime);
+        this.olTable = new OperatorLocationTable(jolRuntime);
         this.cdTable = new ConnectorDescriptorTable(jolRuntime);
         this.anTable = new ActivityNodeTable(jolRuntime);
         this.acTable = new ActivityConnectionTable(jolRuntime);
         this.abTable = new ActivityBlockedTable(jolRuntime);
+        this.jobStartTable = new JobStartTable(jolRuntime);
+        this.startMessageTable = new StartMessageTable(jolRuntime);
 
         jolRuntime.catalog().register(jobTable);
         jolRuntime.catalog().register(odTable);
+        jolRuntime.catalog().register(olTable);
         jolRuntime.catalog().register(cdTable);
         jolRuntime.catalog().register(anTable);
         jolRuntime.catalog().register(acTable);
         jolRuntime.catalog().register(abTable);
+        jolRuntime.catalog().register(jobStartTable);
+        jolRuntime.catalog().register(startMessageTable);
+
+        startMessageTable.register(new StartMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @Override
+            public void insertion(TupleSet tuples) {
+                try {
+                    synchronized (JOLJobManagerImpl.this) {
+                        for (Tuple t : tuples) {
+                            Object[] data = t.toArray();
+                            UUID jobId = (UUID) data[0];
+                            UUID stageId = (UUID) data[1];
+                            JobPlan plan = (JobPlan) data[2];
+                            TupleSet ts = (TupleSet) data[3];
+                            ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+                                .size()];
+                            int i = 0;
+                            for (Tuple t2 : ts) {
+                                Object[] t2Data = t2.toArray();
+                                p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+                                    plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
+                            }
+                            Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+                                new ClusterControllerService.PortMapMergingAccumulator());
+                            ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+                                .size()];
+                            i = 0;
+                            for (Tuple t2 : ts) {
+                                Object[] t2Data = t2.toArray();
+                                p2is[i++] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+                                    plan, stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+                            }
+                            ccs.runRemote(p2is, null);
+                            ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+                                .size()];
+                            i = 0;
+                            for (Tuple t2 : ts) {
+                                Object[] t2Data = t2.toArray();
+                                p3is[i++] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+                                    stageId);
+                            }
+                            ccs.runRemote(p3is, null);
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
 
         jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
         jolRuntime.evaluate();
@@ -83,10 +158,11 @@
     }
 
     @Override
-    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
         final UUID jobId = UUID.randomUUID();
 
-        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobFlags));
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
 
         final BasicTupleSet anTuples = new BasicTupleSet();
         final BasicTupleSet acTuples = new BasicTupleSet();
@@ -95,30 +171,44 @@
             @Override
             public void addTask(IActivityNode task) {
                 anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+                builder.addTask(task);
             }
 
             @Override
             public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
                 acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
                     taskOutputIndex));
+                builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
             }
 
             @Override
             public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
                 acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
                     taskInputIndex));
+                builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
             }
 
             @Override
             public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
                 abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+                builder.addBlockingEdge(blocker, blocked);
             }
         };
 
         BasicTupleSet odTuples = new BasicTupleSet();
+        BasicTupleSet olTuples = new BasicTupleSet();
         for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
             IOperatorDescriptor od = e.getValue();
             odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
+            PartitionConstraint pc = od.getPartitionConstraint();
+            LocationConstraint[] locationConstraints = pc.getLocationConstraints();
+            String[] partitions = new String[locationConstraints.length];
+            for (int i = 0; i < locationConstraints.length; ++i) {
+                String nodeId = ((AbsoluteLocationConstraint) locationConstraints[i]).getLocationId();
+                olTuples.add(OperatorLocationTable.createTuple(jobId, od.getOperatorId(), nodeId));
+                partitions[i] = nodeId;
+            }
+            od.setPartitions(partitions);
             od.contributeTaskGraph(gBuilder);
         }
 
@@ -127,8 +217,11 @@
             cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
         }
 
+        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
         jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
         jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
@@ -155,8 +248,13 @@
     }
 
     @Override
-    public void start(UUID jobId) throws Exception {
+    public synchronized void start(UUID jobId) throws Exception {
+        BasicTupleSet jsTuples = new BasicTupleSet();
+        jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
 
+        jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+        jolRuntime.evaluate();
     }
 
     @Override
@@ -165,7 +263,7 @@
     }
 
     /*
-     * declare(job, keys(0), {JobId, Flags, Status})
+     * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
      */
     private static class JobTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
@@ -173,15 +271,15 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, EnumSet.class, JobStatus.class, PerJobCounter.class
+            UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class
         };
 
         public JobTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
-        static Tuple createInitialJobTuple(UUID jobId, EnumSet<JobFlag> jobFlags) {
-            return new Tuple(jobId, jobFlags, JobStatus.INITIALIZED, new PerJobCounter());
+        static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan);
         }
     }
 
@@ -207,6 +305,27 @@
     }
 
     /*
+     * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+     */
+    private static class OperatorLocationTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, String.class
+        };
+
+        public OperatorLocationTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId) {
+            return new Tuple(jobId, opId, nodeId);
+        }
+    }
+
+    /*
      * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
      */
     private static class ConnectorDescriptorTable extends BasicTable {
@@ -240,7 +359,7 @@
     }
 
     /*
-     * declare(activitynode, keys(0), {JobId, OperatorId, ActivityId, ActivityNode})
+     * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
      */
     private static class ActivityNodeTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
@@ -267,7 +386,7 @@
     private static class ActivityConnectionTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
 
-        private static Key PRIMARY_KEY = new Key(0);
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
@@ -289,10 +408,14 @@
     private static class ActivityBlockedTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
 
-        private static Key PRIMARY_KEY = new Key(0);
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
 
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, ActivityNodeId.class
+            UUID.class,
+            OperatorDescriptorId.class,
+            ActivityNodeId.class,
+            OperatorDescriptorId.class,
+            ActivityNodeId.class
         };
 
         public ActivityBlockedTable(Runtime context) {
@@ -300,8 +423,49 @@
         }
 
         static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
-            OperatorDescriptorId odId = blocker.getActivityNodeId().getOperatorDescriptorId();
-            return new Tuple(jobId, odId, blocker.getActivityNodeId(), blocked.getActivityNodeId());
+            ActivityNodeId blockerANId = blocker.getActivityNodeId();
+            OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+            ActivityNodeId blockedANId = blocked.getActivityNodeId();
+            OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+            return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+        }
+    }
+
+    /*
+     * declare(jobstart, keys(0), {JobId, SubmitTime})
+     */
+    private static class JobStartTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, Long.class
+        };
+
+        public JobStartTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, long submitTime) {
+            return new Tuple(jobId, submitTime);
+        }
+    }
+
+    /*
+     * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+     */
+    private static class StartMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, JobPlan.class, TupleSet.class
+        };
+
+        public StartMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index 828a6db..11a7bdd 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -23,13 +23,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.controller.INodeController;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -54,9 +51,8 @@
     }
 
     public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobPlanBuilder builder = new JobPlanBuilder();
-        builder.init(jobSpec, jobFlags);
-        JobControl jc = new JobControl(this, builder.plan());
+        JobPlanner planner = new JobPlanner();
+        JobControl jc = new JobControl(this, planner.plan(jobSpec, jobFlags));
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jc.getJobPlan().toString());
         }
@@ -68,8 +64,7 @@
     public synchronized void start(UUID jobId) throws Exception {
         JobControl jobControlImpl = jobMap.get(jobId);
         LOGGER
-                .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: "
-                        + jobControlImpl.getJobStatus());
+            .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: " + jobControlImpl.getJobStatus());
         if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
             return;
         }
@@ -142,143 +137,39 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deploying: " + stage);
         }
+        UUID jobId = jc.getJobId();
+        JobPlan plan = jc.getJobPlan();
+        UUID stageId = stage.getId();
         Set<String> participatingNodes = plan(jc, stage);
         StageProgress stageProgress = new StageProgress(stage.getId());
         stageProgress.addPendingNodes(participatingNodes);
-        Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
-                new PortMapMergingAccumulator(), participatingNodes);
-        runRemote(new Phase2Installer(jc, stage, globalPortMap), null, participatingNodes);
-        runRemote(new Phase3Installer(jc, stage), null, participatingNodes);
+        ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[participatingNodes
+            .size()];
+        ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[participatingNodes
+            .size()];
+        ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[participatingNodes
+            .size()];
+        int i = 0;
+        for (String nodeId : participatingNodes) {
+            p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, stage.getTasks());
+        }
+        Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+            new ClusterControllerService.PortMapMergingAccumulator());
+        i = 0;
+        for (String nodeId : participatingNodes) {
+            p2is[i++] = new ClusterControllerService.Phase2Installer(nodeId, jobId, plan, stageId, stage.getTasks(),
+                globalPortMap);
+        }
+        ccs.runRemote(p2is, null);
+        i = 0;
+        for (String nodeId : participatingNodes) {
+            p3is[i++] = new ClusterControllerService.Phase3Installer(nodeId, jobId, stageId);
+        }
+        ccs.runRemote(p3is, null);
         jc.setStageProgress(stage.getId(), stageProgress);
         return participatingNodes;
     }
 
-    private interface RemoteOp<T> {
-        public T execute(INodeController node) throws Exception;
-    }
-
-    private interface Accumulator<T, R> {
-        public void accumulate(T o);
-
-        public R getResult();
-    }
-
-    private static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
-        private JobControl jc;
-        private JobStage stage;
-
-        public Phase1Installer(JobControl jc, JobStage stage) {
-            this.jc = jc;
-            this.stage = stage;
-        }
-
-        @Override
-        public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
-            return node.initializeJobletPhase1(jc.getJobId(), jc.getJobPlan(), stage);
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 1";
-        }
-    }
-
-    private static class Phase2Installer implements RemoteOp<Void> {
-        private JobControl jc;
-        private JobStage stage;
-        private Map<PortInstanceId, Endpoint> globalPortMap;
-
-        public Phase2Installer(JobControl jc, JobStage stage, Map<PortInstanceId, Endpoint> globalPortMap) {
-            this.jc = jc;
-            this.stage = stage;
-            this.globalPortMap = globalPortMap;
-        }
-
-        @Override
-        public Void execute(INodeController node) throws Exception {
-            node.initializeJobletPhase2(jc.getJobId(), jc.getJobPlan(), stage, globalPortMap);
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 2";
-        }
-    }
-
-    private static class Phase3Installer implements RemoteOp<Void> {
-        private JobControl jc;
-        private JobStage stage;
-
-        public Phase3Installer(JobControl jc, JobStage stage) {
-            this.jc = jc;
-            this.stage = stage;
-        }
-
-        @Override
-        public Void execute(INodeController node) throws Exception {
-            node.commitJobletInitialization(jc.getJobId(), jc.getJobPlan(), stage);
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 3";
-        }
-    }
-
-    private static class PortMapMergingAccumulator implements
-            Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
-        Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
-        @Override
-        public void accumulate(Map<PortInstanceId, Endpoint> o) {
-            portMap.putAll(o);
-        }
-
-        @Override
-        public Map<PortInstanceId, Endpoint> getResult() {
-            return portMap;
-        }
-    }
-
-    private <T, R> R runRemote(final RemoteOp<T> remoteOp, final Accumulator<T, R> accumulator,
-            Set<String> candidateNodes) throws Exception {
-        LOGGER.log(Level.INFO, remoteOp + " : " + candidateNodes);
-
-        final Semaphore installComplete = new Semaphore(candidateNodes.size());
-        final List<Exception> errors = new Vector<Exception>();
-        for (final String nodeId : candidateNodes) {
-            final INodeController node = ccs.lookupNode(nodeId).getNodeController();
-
-            installComplete.acquire();
-            Runnable remoteRunner = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        T t = remoteOp.execute(node);
-                        if (accumulator != null) {
-                            synchronized (accumulator) {
-                                accumulator.accumulate(t);
-                            }
-                        }
-                    } catch (Exception e) {
-                        errors.add(e);
-                    } finally {
-                        installComplete.release();
-                    }
-                }
-            };
-
-            ccs.getExecutor().execute(remoteRunner);
-        }
-        installComplete.acquire(candidateNodes.size());
-        if (!errors.isEmpty()) {
-            throw errors.get(0);
-        }
-        return accumulator == null ? null : accumulator.getResult();
-    }
-
     private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
         Set<String> participatingNodes = planner.plan(jc, stage);
@@ -289,7 +180,7 @@
     }
 
     public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
-            StageletStatistics statistics) throws Exception {
+        StageletStatistics statistics) throws Exception {
         JobControl jc = jobMap.get(jobId);
         if (jc != null) {
             jc.notifyStageletComplete(stageId, nodeId, statistics);
@@ -312,10 +203,7 @@
         return null;
     }
 
+    @Override
     public synchronized void notifyNodeFailure(String nodeId) {
-        for(Map.Entry<UUID, JobControl> e : jobMap.entrySet()) {
-            JobControl jc = e.getValue();
-            
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
index 723736c..d748a13 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -1,22 +1,7 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -24,19 +9,11 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.util.Pair;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
 
 public class JobPlanBuilder implements IActivityGraphBuilder {
     private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
@@ -53,22 +30,22 @@
     public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
             LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskInputIndex);
+                + task.getActivityNodeId() + ":" + taskInputIndex);
         }
         insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
         insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
-                .getActivityNodeId());
+            .getActivityNodeId());
     }
 
     @Override
     public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
         if (LOGGER.isLoggable(Level.FINEST)) {
             LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskOutputIndex);
+                + task.getActivityNodeId() + ":" + taskOutputIndex);
         }
         insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
         insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
-                .getActivityNodeId());
+            .getActivityNodeId());
     }
 
     @Override
@@ -93,98 +70,6 @@
         }
     }
 
-    private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
-        Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
-        for (JobStage eqSet : eqSets) {
-            for (ActivityNodeId t : eqSet.getTasks()) {
-                IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
-                List<Integer> inputList = plan.getTaskInputMap().get(t);
-                if (inputList != null) {
-                    for (Integer idx : inputList) {
-                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
-                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
-                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
-                        ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
-                        if (!eqSet.getTasks().contains(inTask)) {
-                            return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
-                        }
-                    }
-                }
-                List<Integer> outputList = plan.getTaskOutputMap().get(t);
-                if (outputList != null) {
-                    for (Integer idx : outputList) {
-                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
-                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
-                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
-                        ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
-                        if (!eqSet.getTasks().contains(outTask)) {
-                            return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
-                        }
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    private JobStage inferStages() throws Exception {
-        JobSpecification spec = plan.getJobSpecification();
-
-        /*
-         * Build initial equivalence sets map. We create a map such that for
-         * each IOperatorTask, t -> { t }
-         */
-        Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
-        Set<JobStage> stages = new HashSet<JobStage>();
-        for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
-            for (ActivityNodeId taskId : taskIds) {
-                Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
-                eqSet.add(taskId);
-                JobStage stage = new JobStage(eqSet);
-                stageMap.put(taskId, stage);
-                stages.add(stage);
-            }
-        }
-
-        boolean changed = true;
-        while (changed) {
-            changed = false;
-            Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
-            if (pair != null) {
-                merge(stageMap, stages, pair.first, pair.second);
-                changed = true;
-            }
-        }
-
-        JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
-        Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
-        for (JobStage s : stages) {
-            endStage.addDependency(s);
-            s.addDependent(endStage);
-            Set<JobStage> blockedStages = new HashSet<JobStage>();
-            for (ActivityNodeId t : s.getTasks()) {
-                Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
-                if (blockedTasks != null) {
-                    for (ActivityNodeId bt : blockedTasks) {
-                        blockedStages.add(stageMap.get(bt));
-                    }
-                }
-            }
-            for (JobStage bs : blockedStages) {
-                bs.addDependency(s);
-                s.addDependent(bs);
-            }
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
-            for (JobStage s : stages) {
-                LOGGER.info(s.toString());
-            }
-            LOGGER.info("SID: ENDSTAGE");
-        }
-        return endStage;
-    }
-
     public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
         plan = new JobPlan(jobSpec, jobFlags);
     }
@@ -199,37 +84,7 @@
         vList.set(index, value);
     }
 
-    private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
-            ActivityNodeId t2) {
-        JobStage stage1 = eqSetMap.get(t1);
-        Set<ActivityNodeId> s1 = stage1.getTasks();
-        JobStage stage2 = eqSetMap.get(t2);
-        Set<ActivityNodeId> s2 = stage2.getTasks();
-
-        Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
-        mergedSet.addAll(s1);
-        mergedSet.addAll(s2);
-
-        eqSets.remove(stage1);
-        eqSets.remove(stage2);
-        JobStage mergedStage = new JobStage(mergedSet);
-        eqSets.add(mergedStage);
-
-        for (ActivityNodeId t : mergedSet) {
-            eqSetMap.put(t, mergedStage);
-        }
-    }
-
-    public JobPlan plan() throws Exception {
-        PlanUtils.visit(plan.getJobSpecification(), new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) throws Exception {
-                op.contributeTaskGraph(JobPlanBuilder.this);
-            }
-        });
-        JobStage endStage = inferStages();
-        plan.setEndStage(endStage);
-
+    public JobPlan getPlan() {
         return plan;
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
new file mode 100644
index 0000000..f992de1
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStage;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
+
+public class JobPlanner {
+    private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
+
+    private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
+        Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
+        for (JobStage eqSet : eqSets) {
+            for (ActivityNodeId t : eqSet.getTasks()) {
+                IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+                List<Integer> inputList = plan.getTaskInputMap().get(t);
+                if (inputList != null) {
+                    for (Integer idx : inputList) {
+                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
+                        ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+                        if (!eqSet.getTasks().contains(inTask)) {
+                            return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+                        }
+                    }
+                }
+                List<Integer> outputList = plan.getTaskOutputMap().get(t);
+                if (outputList != null) {
+                    for (Integer idx : outputList) {
+                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
+                        ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+                        if (!eqSet.getTasks().contains(outTask)) {
+                            return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    private JobStage inferStages(JobPlan plan) throws Exception {
+        JobSpecification spec = plan.getJobSpecification();
+
+        /*
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+         */
+        Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
+        Set<JobStage> stages = new HashSet<JobStage>();
+        for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
+            for (ActivityNodeId taskId : taskIds) {
+                Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+                eqSet.add(taskId);
+                JobStage stage = new JobStage(eqSet);
+                stageMap.put(taskId, stage);
+                stages.add(stage);
+            }
+        }
+
+        boolean changed = true;
+        while (changed) {
+            changed = false;
+            Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
+            if (pair != null) {
+                merge(stageMap, stages, pair.first, pair.second);
+                changed = true;
+            }
+        }
+
+        JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
+        Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
+        for (JobStage s : stages) {
+            endStage.addDependency(s);
+            s.addDependent(endStage);
+            Set<JobStage> blockedStages = new HashSet<JobStage>();
+            for (ActivityNodeId t : s.getTasks()) {
+                Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+                if (blockedTasks != null) {
+                    for (ActivityNodeId bt : blockedTasks) {
+                        blockedStages.add(stageMap.get(bt));
+                    }
+                }
+            }
+            for (JobStage bs : blockedStages) {
+                bs.addDependency(s);
+                s.addDependent(bs);
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+            for (JobStage s : stages) {
+                LOGGER.info(s.toString());
+            }
+            LOGGER.info("SID: ENDSTAGE");
+        }
+        return endStage;
+    }
+
+    private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
+        ActivityNodeId t2) {
+        JobStage stage1 = eqSetMap.get(t1);
+        Set<ActivityNodeId> s1 = stage1.getTasks();
+        JobStage stage2 = eqSetMap.get(t2);
+        Set<ActivityNodeId> s2 = stage2.getTasks();
+
+        Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+        mergedSet.addAll(s1);
+        mergedSet.addAll(s2);
+
+        eqSets.remove(stage1);
+        eqSets.remove(stage2);
+        JobStage mergedStage = new JobStage(mergedSet);
+        eqSets.add(mergedStage);
+
+        for (ActivityNodeId t : mergedSet) {
+            eqSetMap.put(t, mergedStage);
+        }
+    }
+
+    public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
+        PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) throws Exception {
+                op.contributeTaskGraph(builder);
+            }
+        });
+        JobPlan plan = builder.getPlan();
+        JobStage endStage = inferStages(plan);
+        plan.setEndStage(endStage);
+
+        return plan;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
deleted file mode 100644
index 4644daf..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-public class PerJobCounter {
-    private int stageCounter;
-
-    public PerJobCounter() {
-    }
-
-    public int getStageCounterAndIncrement() {
-        return stageCounter++;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index f8f6806..cde2ec4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -54,7 +55,6 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.comm.ConnectionManager;
 import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
@@ -150,7 +150,7 @@
         Matcher m = pattern.matcher(ipaddrStr);
         if (!m.matches()) {
             throw new Exception(MessageFormat.format(
-                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+                "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
         }
         byte[] ipBytes = new byte[4];
         ipBytes[0] = (byte) Integer.parseInt(m.group(1));
@@ -161,22 +161,21 @@
     }
 
     @Override
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
-            throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
-                + "]: Initializing Joblet Phase 1");
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+        Set<ActivityNodeId> activities) throws Exception {
+        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
 
         final Joblet joblet = getLocalJoblet(jobId);
 
-        Stagelet stagelet = new Stagelet(joblet, stage.getId(), id);
-        joblet.setStagelet(stage.getId(), stagelet);
+        Stagelet stagelet = new Stagelet(joblet, stageId, id);
+        joblet.setStagelet(stageId, stagelet);
 
         final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
         Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
 
         List<Endpoint> endpointList = new ArrayList<Endpoint>();
 
-        for (ActivityNodeId hanId : stage.getTasks()) {
+        for (ActivityNodeId hanId : activities) {
             IActivityNode han = plan.getActivityNodeMap().get(hanId);
             if (LOGGER.isLoggable(Level.FINEST)) {
                 LOGGER.finest("Initializing " + hanId + " -> " + han);
@@ -200,8 +199,8 @@
                             endpointList.add(endpoint);
                             DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
                             connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
-                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
-                                    Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
+                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+                                .getTaskInputMap().get(hanId).get(j), i);
                             if (LOGGER.isLoggable(Level.FINEST)) {
                                 LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
                             }
@@ -221,7 +220,7 @@
     }
 
     private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
-            final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
+        final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
         final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
 
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
@@ -246,26 +245,25 @@
             public void close() throws HyracksDataException {
                 reader.close();
                 stagelet.getStatistics().getStatisticsMap().put(
-                        "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
-                        String.valueOf(frameCount));
+                    "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+                    String.valueOf(frameCount));
             }
         } : reader;
     }
 
     @Override
-    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, JobStage stage,
-            final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
-                + "]: Initializing Joblet Phase 2");
+    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+        final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
         final Joblet ji = getLocalJoblet(jobId);
-        Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
+        Stagelet si = (Stagelet) ji.getStagelet(stageId);
         final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
 
-        final Stagelet stagelet = (Stagelet) ji.getStagelet(stage.getId());
+        final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
 
         final JobSpecification spec = plan.getJobSpecification();
 
-        for (ActivityNodeId hanId : stage.getTasks()) {
+        for (ActivityNodeId hanId : activities) {
             IActivityNode han = plan.getActivityNodeMap().get(hanId);
             IOperatorDescriptor op = han.getOwner();
             List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
@@ -282,13 +280,13 @@
                                 @Override
                                 public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
                                     PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                            Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                        Direction.INPUT, spec.getConsumerInputIndex(conn), index);
                                     Endpoint ep = globalPortMap.get(piId);
                                     if (LOGGER.isLoggable(Level.FINEST)) {
                                         LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
                                     }
                                     return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
-                                            .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                                        .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
                                 }
                             };
                             or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
@@ -301,7 +299,7 @@
     }
 
     private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
-            final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+        final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
             private int frameCount;
 
@@ -321,16 +319,16 @@
             public void close() throws HyracksDataException {
                 writer.close();
                 stagelet.getStatistics().getStatisticsMap().put(
-                        "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
-                        String.valueOf(frameCount));
+                    "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
+                    String.valueOf(frameCount));
             }
         } : writer;
     }
 
     @Override
-    public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception {
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
         final Joblet ji = getLocalJoblet(jobId);
-        Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
+        Stagelet si = (Stagelet) ji.getStagelet(stageId);
         for (Endpoint e : si.getEndpointList()) {
             connectionManager.unacceptConnection(e.getEndpointId());
         }
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index abba642..dd62758 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -1,50 +1,115 @@
 program hyrackscc;
 
 import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 
-define(testOp, keys(0, 1), {UUID, OperatorDescriptorId, IOperatorDescriptor});
-define(testConn, keys(0, 1), {UUID, ConnectorDescriptorId, OperatorDescriptorId, Integer, OperatorDescriptorId, Integer});
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
 
-testOp(JobId, Opid, Op) :- 
-    operatordescriptor(JobId, Opid, Op)
+activitystage(JobId, OperatorId, ActivityId, 0) :-
+    activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+    StageNumber1 == StageNumber2
     {
-        java.lang.System.err.println(Opid);
+        StageNumber := StageNumber1 + 1;
     };
 
-testConn(JobId, Cid, SOpid, SPort, DOpid, DPort) :- 
-    connectordescriptor(JobId, Cid, SOpid, SPort, DOpid, DPort, _)
-    {
-        java.lang.System.err.println(Cid.toString() + " " + SOpid.toString() + ":" + SPort.toString() + " -> " + DOpid.toString() + ":" + DPort.toString());
-    };
-
-define(activitystage, keys(0, 1, 2, 3, 4), {UUID, OperatorDescriptorId, ActivityNodeId, OperatorDescriptorId, ActivityNodeId});
-
-activitystage(JobId, OperatorId, ActivityId, OperatorId, ActivityId) :-
-    activitynode(JobId, OperatorId, ActivityId, _)
-    {
-        java.lang.System.err.println("INITIAL: " + JobId.toString() + " " + OperatorId.toString() + " " + ActivityId.toString());
-    };
-
-activitystage(JobId, OperatorId2, ActivityId2, OperatorId1, ActivityId1) :-
-    activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2)
-    {
-        java.lang.System.err.println("CHANGE1: " + JobId.toString() + " " + OperatorId2.toString() + " " + ActivityId2.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString());
-    };
-
-activitystage(JobId, OperatorId1, ActivityId1, OperatorId3, ActivityId3) :-
-    activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
-    activitynode(JobId, OperatorId3, ActivityId3, _),
+activitystage(JobId, OperatorId1, ActivityId1, StageNumber) :-
+    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
     activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
-    activityconnection(JobId, OperatorId3, Operator3Port, _, ActivityId3, _),
-    connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId3, Operator3Port, _)
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
     {
-        java.lang.System.err.println("CHANGE2: " + JobId.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString() + " " + OperatorId3.toString() + " " + ActivityId3.toString());
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
     };
 
-watch(activitynode, a);
-watch(activitystage, a);
+activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId1, Operator1Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+    activitystage(JobId, _, _, StageNumber)
+    {
+        StageId := java.util.UUID.randomUUID();
+    };
+
+watch(jobstage, a);
+
+define(stagestart, keys(0), {UUID, Integer});
+define(stagefinish, keys(), {UUID, Integer});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0) :-
+    jobstart(JobId, _),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _),
+    notin stagestart(JobId, _);
+
+/*
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan),
+    jobstart(JobId, _);
+*/
+
+stagestart_NEXT stagestart(JobId, NextStageNumber) :-
+    stagestart(JobId, StageNumber),
+    stagefinish(StageId, StageNumber)
+    {
+        NextStageNumber := StageNumber + 1;
+    };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, UUID, String});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, StageId, NodeId) :-
+    stagestart(JobId, StageNumber),
+    operatordescriptor(JobId, OperatorId, _),
+    activitystage(JobId, OperatorId, ActivityId, StageNumber),
+    jobstage(JobId, StageNumber, StageId),
+    operatorlocation(JobId, OperatorId, NodeId);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, ActivityNodeId});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId) :-
+    activitystart(JobId, _, ActivityId, _, StageId, NodeId),
+    job(JobId, _, _, JobPlan);
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+startmessage(JobId, StageId, JobPlan, tupleset<Tuple>) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId)
+    {
+        Tuple := new Tuple([NodeId, ActivityId]);
+    };
+
+watch(startmessage, i);
\ No newline at end of file