Refactored controllers. Added a JobManager implementation based on JOL

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@14 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/.classpath b/hyracks-core/.classpath
index 370f43c..2fb3f21 100644
--- a/hyracks-core/.classpath
+++ b/hyracks-core/.classpath
@@ -2,6 +2,7 @@
 <classpath>
 	<classpathentry kind="src" path="src/main/java"/>
 	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="src" path="src/main/resources"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-core/pom.xml b/hyracks-core/pom.xml
index 5f91b97..0222956 100644
--- a/hyracks-core/pom.xml
+++ b/hyracks-core/pom.xml
@@ -129,6 +129,13 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>jol</groupId>
+  		<artifactId>jol</artifactId>
+  		<version>1.0.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index af86093..19e81f1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -17,7 +17,6 @@
 import java.rmi.Remote;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -27,15 +26,15 @@
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public interface IClusterController extends Remote {
-    public void registerNode(INodeController nodeController) throws Exception;
+    public NodeParameters registerNode(INodeController nodeController) throws Exception;
 
     public void unregisterNode(INodeController nodeController) throws Exception;
 
-    public INodeController lookupNode(String id) throws Exception;
-
     public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
             throws Exception;
 
+    public void nodeHeartbeat(String id) throws Exception;
+
     /*
      * Client Job Control methods.
      */
@@ -48,6 +47,6 @@
     public void start(UUID jobId) throws Exception;
 
     public JobStatistics waitForCompletion(UUID jobId) throws Exception;
-    
-    public Map<String,INodeController> getRegistry() throws Exception;
+
+    public Map<String, INodeController> getRegistry() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
new file mode 100644
index 0000000..3856fea
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.controller;
+
+import java.io.Serializable;
+
+public class NodeParameters implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int heartbeatPeriod;
+
+    public int getHeartbeatPeriod() {
+        return heartbeatPeriod;
+    }
+
+    public void setHeartbeatPeriod(int heartbeatPeriod) {
+        this.heartbeatPeriod = heartbeatPeriod;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index c7f3175..86d8118 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -46,4 +46,9 @@
         ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
         return id.equals(other.id);
     }
+
+    @Override
+    public String toString() {
+        return "CDID:" + id;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
new file mode 100644
index 0000000..c884998
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public enum Direction {
+    INPUT,
+    OUTPUT,
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
index d8057d9..844266d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
@@ -17,11 +17,6 @@
 import java.io.Serializable;
 
 public final class PortInstanceId implements Serializable {
-    public enum Direction {
-        INPUT,
-        OUTPUT,
-    }
-
     private static final long serialVersionUID = 1L;
 
     private OperatorDescriptorId odId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
new file mode 100644
index 0000000..2d76c26
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.Serializable;
+
+public interface IResource extends Serializable {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
new file mode 100644
index 0000000..3a6f930
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ISpaceSharedResource {
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
new file mode 100644
index 0000000..1fb0465
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ITimeSharedResource {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 950569d..081c3f2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -22,4 +22,13 @@
 
     @Option(name = "-http-port", usage = "Sets the http port for the admin console")
     public int httpPort;
+
+    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in seconds (default: 10)")
+    public int heartbeatPeriod = 10;
+
+    @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+    public int maxHeartbeatLapsePeriods = 5;
+
+    @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+    public boolean useJOL = false;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
similarity index 67%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 0461499..9b2fc26 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -12,15 +12,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.controller;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -31,6 +36,8 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import jol.core.Runtime;
+
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -38,13 +45,14 @@
 
 import edu.uci.ics.hyracks.api.controller.IClusterController;
 import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.job.JobManager;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
 import edu.uci.ics.hyracks.web.WebServer;
 
 public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
@@ -54,20 +62,32 @@
 
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
 
-    private final Map<String, INodeController> nodeRegistry;
+    private final Map<String, NodeControllerState> nodeRegistry;
 
     private WebServer webServer;
 
-    private final JobManager jobManager;
+    private final IJobManager jobManager;
 
     private final Executor taskExecutor;
 
+    private final Timer timer;
+
+    private Runtime jolRuntime;
+
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
-        nodeRegistry = new LinkedHashMap<String, INodeController>();
-        jobManager = new JobManager(this);
+        nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        if (ccConfig.useJOL) {
+            jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_WATCH, System.err);
+            jobManager = new JOLJobManagerImpl(jolRuntime);
+        } else {
+            jobManager = new JobManagerImpl(this);
+        }
         taskExecutor = Executors.newCachedThreadPool();
-        webServer = new WebServer(new Handler[] { getAdminConsoleHandler() });
+        webServer = new WebServer(new Handler[] {
+            getAdminConsoleHandler()
+        });
+        this.timer = new Timer(true);
     }
 
     @Override
@@ -77,6 +97,7 @@
         registry.rebind(IClusterController.class.getName(), this);
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
+        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod * 1000);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
@@ -98,16 +119,20 @@
     }
 
     @Override
-    public void registerNode(INodeController nodeController) throws Exception {
+    public NodeParameters registerNode(INodeController nodeController) throws Exception {
         String id = nodeController.getId();
+        NodeControllerState state = new NodeControllerState(nodeController);
         synchronized (this) {
             if (nodeRegistry.containsKey(id)) {
                 throw new Exception("Node with this name already registered.");
             }
-            nodeRegistry.put(id, nodeController);
+            nodeRegistry.put(id, state);
         }
         nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+        NodeParameters params = new NodeParameters();
+        params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+        return params;
     }
 
     @Override
@@ -119,8 +144,7 @@
         LOGGER.log(Level.INFO, "Unregistered INodeController");
     }
 
-    @Override
-    public INodeController lookupNode(String id) throws Exception {
+    public synchronized NodeControllerState lookupNode(String id) throws Exception {
         return nodeRegistry.get(id);
     }
 
@@ -129,12 +153,12 @@
     }
 
     public synchronized void notifyJobComplete(final UUID jobId) {
-        for (final INodeController nc : nodeRegistry.values()) {
+        for (final NodeControllerState ns : nodeRegistry.values()) {
             taskExecutor.execute(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        nc.cleanUpJob(jobId);
+                        ns.getNodeController().cleanUpJob(jobId);
                     } catch (Exception e) {
                     }
                 }
@@ -145,7 +169,7 @@
 
     @Override
     public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-            throws Exception {
+        throws Exception {
         jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
     }
 
@@ -169,7 +193,7 @@
         handler.setHandler(new AbstractHandler() {
             @Override
             public void handle(String target, Request baseRequest, HttpServletRequest request,
-                    HttpServletResponse response) throws IOException, ServletException {
+                HttpServletResponse response) throws IOException, ServletException {
                 if (!"/".equals(target)) {
                     return;
                 }
@@ -182,12 +206,12 @@
                 writer.println("<h2>Node Controllers</h2>");
                 writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
                 synchronized (ClusterControllerService.this) {
-                    for (Map.Entry<String, INodeController> e : nodeRegistry.entrySet()) {
+                    for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
                         try {
-                            INodeController n = e.getValue();
                             writer.print("<tr><td>");
                             writer.print(e.getKey());
                             writer.print("</td><td>");
+                            writer.print(e.getValue().getLastHeartbeatDuration());
                             writer.print("</td></tr>");
                         } catch (Exception ex) {
                         }
@@ -203,6 +227,44 @@
 
     @Override
     public Map<String, INodeController> getRegistry() throws Exception {
-        return nodeRegistry;
+        Map<String, INodeController> map = new HashMap<String, INodeController>();
+        for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+            map.put(e.getKey(), e.getValue().getNodeController());
+        }
+        return map;
+    }
+
+    @Override
+    public synchronized void nodeHeartbeat(String id) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Heartbeat from: " + id);
+        }
+        NodeControllerState ns = nodeRegistry.get(id);
+        if (ns != null) {
+            ns.notifyHeartbeat();
+        }
+    }
+
+    private void killNode(String nodeId) {
+        nodeRegistry.remove(nodeId);
+        jobManager.notifyNodeFailure(nodeId);
+    }
+
+    private class DeadNodeSweeper extends TimerTask {
+        @Override
+        public void run() {
+            Set<String> deadNodes = new HashSet<String>();
+            synchronized (ClusterControllerService.this) {
+                for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+                    NodeControllerState state = e.getValue();
+                    if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+                        deadNodes.add(e.getKey());
+                    }
+                }
+                for (String deadNode : deadNodes) {
+                    killNode(deadNode);
+                }
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
new file mode 100644
index 0000000..7892b5c
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public void start(UUID jobId) throws Exception;
+
+    public void advanceJob(JobControl jobControlImpl) throws Exception;
+
+    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
+            throws Exception;
+
+    public JobStatus getJobStatus(UUID jobId);
+
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+    public void notifyNodeFailure(String nodeId);
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
new file mode 100644
index 0000000..e9bffba
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.job.JobStage;
+
+public interface IJobPlanner {
+    public Set<String> plan(JobControl jobControl, JobStage stage) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
new file mode 100644
index 0000000..6e7cf74
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.UUID;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.table.BasicTable;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class JOLJobManagerImpl implements IJobManager {
+    public static final String JOL_SCOPE = "hyrackscc";
+
+    private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+
+    private final Runtime jolRuntime;
+
+    private final JobTable jobTable;
+
+    private final OperatorDescriptorTable odTable;
+
+    private final ConnectorDescriptorTable cdTable;
+
+    private final ActivityNodeTable anTable;
+
+    private final ActivityConnectionTable acTable;
+
+    private final ActivityBlockedTable abTable;
+
+    public JOLJobManagerImpl(Runtime jolRuntime) throws Exception {
+        this.jolRuntime = jolRuntime;
+        this.jobTable = new JobTable(jolRuntime);
+        this.odTable = new OperatorDescriptorTable(jolRuntime);
+        this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+        this.anTable = new ActivityNodeTable(jolRuntime);
+        this.acTable = new ActivityConnectionTable(jolRuntime);
+        this.abTable = new ActivityBlockedTable(jolRuntime);
+
+        jolRuntime.catalog().register(jobTable);
+        jolRuntime.catalog().register(odTable);
+        jolRuntime.catalog().register(cdTable);
+        jolRuntime.catalog().register(anTable);
+        jolRuntime.catalog().register(acTable);
+        jolRuntime.catalog().register(abTable);
+
+        jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public void advanceJob(JobControl jobControlImpl) throws Exception {
+
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final UUID jobId = UUID.randomUUID();
+
+        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobFlags));
+
+        final BasicTupleSet anTuples = new BasicTupleSet();
+        final BasicTupleSet acTuples = new BasicTupleSet();
+        final BasicTupleSet abTuples = new BasicTupleSet();
+        IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+            @Override
+            public void addTask(IActivityNode task) {
+                anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+            }
+
+            @Override
+            public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+                    taskOutputIndex));
+            }
+
+            @Override
+            public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+                    taskInputIndex));
+            }
+
+            @Override
+            public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+                abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+            }
+        };
+
+        BasicTupleSet odTuples = new BasicTupleSet();
+        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+            IOperatorDescriptor od = e.getValue();
+            odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
+            od.contributeTaskGraph(gBuilder);
+        }
+
+        BasicTupleSet cdTuples = new BasicTupleSet();
+        for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+            cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+        }
+
+        jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+        jolRuntime.evaluate();
+
+        return jobId;
+    }
+
+    @Override
+    public JobStatus getJobStatus(UUID jobId) {
+        return null;
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) {
+
+    }
+
+    @Override
+    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
+        throws Exception {
+    }
+
+    @Override
+    public void start(UUID jobId) throws Exception {
+
+    }
+
+    @Override
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+        return null;
+    }
+
+    /*
+     * declare(job, keys(0), {JobId, Flags, Status})
+     */
+    private static class JobTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, EnumSet.class, JobStatus.class, PerJobCounter.class
+        };
+
+        public JobTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createInitialJobTuple(UUID jobId, EnumSet<JobFlag> jobFlags) {
+            return new Tuple(jobId, jobFlags, JobStatus.INITIALIZED, new PerJobCounter());
+        }
+    }
+
+    /*
+     * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+     */
+    private static class OperatorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
+        };
+
+        public OperatorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IOperatorDescriptor od) {
+            return new Tuple(jobId, od.getOperatorId(), od);
+        }
+    }
+
+    /*
+     * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+     */
+    private static class ConnectorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class,
+            ConnectorDescriptorId.class,
+            OperatorDescriptorId.class,
+            Integer.class,
+            OperatorDescriptorId.class,
+            Integer.class,
+            IConnectorDescriptor.class
+        };
+
+        public ConnectorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+            IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+            int srcPort = jobSpec.getProducerOutputIndex(conn);
+            IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+            int destPort = jobSpec.getConsumerInputIndex(conn);
+            Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
+                .getOperatorId(), destPort, conn);
+            return cdTuple;
+        }
+    }
+
+    /*
+     * declare(activitynode, keys(0), {JobId, OperatorId, ActivityId, ActivityNode})
+     */
+    private static class ActivityNodeTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
+        };
+
+        public ActivityNodeTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+                aNode);
+        }
+    }
+
+    /*
+     * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+     */
+    private static class ActivityConnectionTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
+        };
+
+        public ActivityConnectionTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
+                .getActivityNodeId(), activityPort);
+        }
+    }
+
+    /*
+     * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+     */
+    private static class ActivityBlockedTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, ActivityNodeId.class
+        };
+
+        public ActivityBlockedTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+            OperatorDescriptorId odId = blocker.getActivityNodeId().getOperatorDescriptorId();
+            return new Tuple(jobId, odId, blocker.getActivityNodeId(), blocked.getActivityNodeId());
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
similarity index 94%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
index 3259649..a7e2fc1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.rmi.RemoteException;
 import java.util.HashMap;
@@ -30,7 +30,7 @@
 public class JobControl {
     private static final long serialVersionUID = 1L;
 
-    private final JobManager jobManager;
+    private final IJobManager jobManager;
 
     private final JobPlan jobPlan;
 
@@ -44,7 +44,7 @@
 
     private JobStatistics jobStatistics;
 
-    public JobControl(JobManager jobManager, JobPlan jobPlan) throws RemoteException {
+    public JobControl(IJobManager jobManager, JobPlan jobPlan) throws RemoteException {
         this.jobManager = jobManager;
         this.jobPlan = jobPlan;
         jobId = UUID.randomUUID();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
similarity index 79%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index f976bb9..828a6db 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -29,13 +29,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -44,19 +38,19 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
 
-public class JobManager {
-    private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
+public class JobManagerImpl implements IJobManager {
+    private static final Logger LOGGER = Logger.getLogger(JobManagerImpl.class.getName());
     private ClusterControllerService ccs;
 
     private final Map<UUID, JobControl> jobMap;
 
-    public JobManager(ClusterControllerService ccs) {
+    private final IJobPlanner planner;
+
+    public JobManagerImpl(ClusterControllerService ccs) {
         this.ccs = ccs;
         jobMap = new HashMap<UUID, JobControl>();
+        planner = new NaiveJobPlannerImpl();
     }
 
     public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
@@ -119,7 +113,7 @@
         stage.setStarted();
         Set<String> candidateNodes = deploy(jc, stage);
         for (String nodeId : candidateNodes) {
-            ccs.lookupNode(nodeId).startStage(jc.getJobId(), stage.getId());
+            ccs.lookupNode(nodeId).getNodeController().startStage(jc.getJobId(), stage.getId());
         }
     }
 
@@ -148,15 +142,15 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deploying: " + stage);
         }
-        Set<String> candidateNodes = plan(jc, stage);
+        Set<String> participatingNodes = plan(jc, stage);
         StageProgress stageProgress = new StageProgress(stage.getId());
-        stageProgress.addPendingNodes(candidateNodes);
+        stageProgress.addPendingNodes(participatingNodes);
         Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
-                new PortMapMergingAccumulator(), candidateNodes);
-        runRemote(new Phase2Installer(jc, stage, globalPortMap), null, candidateNodes);
-        runRemote(new Phase3Installer(jc, stage), null, candidateNodes);
+                new PortMapMergingAccumulator(), participatingNodes);
+        runRemote(new Phase2Installer(jc, stage, globalPortMap), null, participatingNodes);
+        runRemote(new Phase3Installer(jc, stage), null, participatingNodes);
         jc.setStageProgress(stage.getId(), stageProgress);
-        return candidateNodes;
+        return participatingNodes;
     }
 
     private interface RemoteOp<T> {
@@ -255,7 +249,7 @@
         final Semaphore installComplete = new Semaphore(candidateNodes.size());
         final List<Exception> errors = new Vector<Exception>();
         for (final String nodeId : candidateNodes) {
-            final INodeController node = ccs.lookupNode(nodeId);
+            final INodeController node = ccs.lookupNode(nodeId).getNodeController();
 
             installComplete.acquire();
             Runnable remoteRunner = new Runnable() {
@@ -287,43 +281,11 @@
 
     private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
-
-        final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
-        for (ActivityNodeId t : stage.getTasks()) {
-            opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
-        }
-
-        final Set<String> candidateNodes = new HashSet<String>();
-
-        IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) throws Exception {
-                if (!opSet.contains(op.getOperatorId())) {
-                    return;
-                }
-                String[] partitions = op.getPartitions();
-                if (partitions == null) {
-                    PartitionConstraint pc = op.getPartitionConstraint();
-                    LocationConstraint[] lcs = pc.getLocationConstraints();
-                    String[] assignment = new String[lcs.length];
-                    for (int i = 0; i < lcs.length; ++i) {
-                        String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
-                        assignment[i] = nodeId;
-                    }
-                    op.setPartitions(assignment);
-                    partitions = assignment;
-                }
-                for (String p : partitions) {
-                    candidateNodes.add(p);
-                }
-            }
-        };
-
-        PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
+        Set<String> participatingNodes = planner.plan(jc, stage);
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(stage + " Candidate nodes: " + candidateNodes);
+            LOGGER.info(stage + " Participating nodes: " + participatingNodes);
         }
-        return candidateNodes;
+        return participatingNodes;
     }
 
     public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
@@ -349,4 +311,11 @@
         }
         return null;
     }
+
+    public synchronized void notifyNodeFailure(String nodeId) {
+        for(Map.Entry<UUID, JobControl> e : jobMap.entrySet()) {
+            JobControl jc = e.getValue();
+            
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
similarity index 99%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
index 61cf70f..723736c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
new file mode 100644
index 0000000..f3ccfee
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobStage;
+import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
+
+public class NaiveJobPlannerImpl implements IJobPlanner {
+    @Override
+    public Set<String> plan(JobControl jc, JobStage stage) throws Exception {
+        final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
+        for (ActivityNodeId t : stage.getTasks()) {
+            opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
+        }
+
+        final Set<String> candidateNodes = new HashSet<String>();
+
+        IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) throws Exception {
+                if (!opSet.contains(op.getOperatorId())) {
+                    return;
+                }
+                String[] partitions = op.getPartitions();
+                if (partitions == null) {
+                    PartitionConstraint pc = op.getPartitionConstraint();
+                    LocationConstraint[] lcs = pc.getLocationConstraints();
+                    String[] assignment = new String[lcs.length];
+                    for (int i = 0; i < lcs.length; ++i) {
+                        String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
+                        assignment[i] = nodeId;
+                    }
+                    op.setPartitions(assignment);
+                    partitions = assignment;
+                }
+                for (String p : partitions) {
+                    candidateNodes.add(p);
+                }
+            }
+        };
+
+        PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
+        return candidateNodes;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
new file mode 100644
index 0000000..e209515
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import edu.uci.ics.hyracks.api.controller.INodeController;
+
+public class NodeControllerState {
+    private final INodeController nodeController;
+
+    private int lastHeartbeatDuration;
+
+    public NodeControllerState(INodeController nodeController) {
+        this.nodeController = nodeController;
+    }
+
+    void notifyHeartbeat() {
+        lastHeartbeatDuration = 0;
+    }
+
+    int incrementLastHeartbeatDuration() {
+        return lastHeartbeatDuration++;
+    }
+
+    int getLastHeartbeatDuration() {
+        return lastHeartbeatDuration;
+    }
+
+    public INodeController getNodeController() {
+        return nodeController;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
new file mode 100644
index 0000000..4644daf
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+public class PerJobCounter {
+    private int stageCounter;
+
+    public PerJobCounter() {
+    }
+
+    public int getStageCounterAndIncrement() {
+        return stageCounter++;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
index 3357012..69f7380 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 5f38e56..055216d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -23,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
 
 public class Joblet {
     private static final long serialVersionUID = 1L;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
similarity index 92%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index dbdf1c9..f8f6806 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.controller;
+package edu.uci.ics.hyracks.controller.nodecontroller;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -23,6 +23,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -38,7 +40,9 @@
 import edu.uci.ics.hyracks.api.controller.IClusterController;
 import edu.uci.ics.hyracks.api.controller.INodeController;
 import edu.uci.ics.hyracks.api.controller.NodeCapability;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
 import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
@@ -56,8 +60,7 @@
 import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
 import edu.uci.ics.hyracks.config.NCConfig;
 import edu.uci.ics.hyracks.context.HyracksContext;
-import edu.uci.ics.hyracks.job.Joblet;
-import edu.uci.ics.hyracks.job.Stagelet;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
 import edu.uci.ics.hyracks.runtime.OperatorRunnable;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -73,12 +76,16 @@
 
     private final ConnectionManager connectionManager;
 
+    private final Timer timer;
+
     private IClusterController ccs;
 
     private Map<UUID, Joblet> jobletMap;
 
     private Executor executor;
 
+    private NodeParameters nodeParameters;
+
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
@@ -90,6 +97,7 @@
         connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
         jobletMap = new HashMap<UUID, Joblet>();
         executor = Executors.newCachedThreadPool();
+        timer = new Timer(true);
     }
 
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -100,7 +108,11 @@
         connectionManager.start();
         Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
         IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
-        cc.registerNode(this);
+        this.nodeParameters = cc.registerNode(this);
+
+        // Schedule heartbeat generator.
+        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod() * 1000);
+
         LOGGER.log(Level.INFO, "Started NodeControllerService");
     }
 
@@ -189,7 +201,7 @@
                             DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
                             connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
                             PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
-                                    PortInstanceId.Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
+                                    Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
                             if (LOGGER.isLoggable(Level.FINEST)) {
                                 LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
                             }
@@ -270,7 +282,7 @@
                                 @Override
                                 public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
                                     PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                            PortInstanceId.Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                            Direction.INPUT, spec.getConsumerInputIndex(conn), index);
                                     Endpoint ep = globalPortMap.get(piId);
                                     if (LOGGER.isLoggable(Level.FINEST)) {
                                         LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
@@ -367,8 +379,25 @@
         this.ccs = ccs;
     }
 
-	@Override
-	public NCConfig getConfiguration() throws Exception {
-		return ncConfig;
-	}
+    @Override
+    public NCConfig getConfiguration() throws Exception {
+        return ncConfig;
+    }
+
+    private class HeartbeatTask extends TimerTask {
+        private IClusterController cc;
+
+        public HeartbeatTask(IClusterController cc) {
+            this.cc = cc;
+        }
+
+        @Override
+        public void run() {
+            try {
+                cc.nodeHeartbeat(id);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
similarity index 98%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e82fa5b..8acab28 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
 
 import java.rmi.RemoteException;
 import java.util.Date;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
index 425f967..bb1ba84 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
@@ -17,7 +17,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 
 import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
 
 public class CCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
index 657b304..3c856ab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.dcache.client.DCacheClient;
 import edu.uci.ics.dcache.client.DCacheClientConfig;
 import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
 
 public class NCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
new file mode 100644
index 0000000..abba642
--- /dev/null
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -0,0 +1,50 @@
+program hyrackscc;
+
+import java.util.UUID;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+define(testOp, keys(0, 1), {UUID, OperatorDescriptorId, IOperatorDescriptor});
+define(testConn, keys(0, 1), {UUID, ConnectorDescriptorId, OperatorDescriptorId, Integer, OperatorDescriptorId, Integer});
+
+testOp(JobId, Opid, Op) :- 
+    operatordescriptor(JobId, Opid, Op)
+    {
+        java.lang.System.err.println(Opid);
+    };
+
+testConn(JobId, Cid, SOpid, SPort, DOpid, DPort) :- 
+    connectordescriptor(JobId, Cid, SOpid, SPort, DOpid, DPort, _)
+    {
+        java.lang.System.err.println(Cid.toString() + " " + SOpid.toString() + ":" + SPort.toString() + " -> " + DOpid.toString() + ":" + DPort.toString());
+    };
+
+define(activitystage, keys(0, 1, 2, 3, 4), {UUID, OperatorDescriptorId, ActivityNodeId, OperatorDescriptorId, ActivityNodeId});
+
+activitystage(JobId, OperatorId, ActivityId, OperatorId, ActivityId) :-
+    activitynode(JobId, OperatorId, ActivityId, _)
+    {
+        java.lang.System.err.println("INITIAL: " + JobId.toString() + " " + OperatorId.toString() + " " + ActivityId.toString());
+    };
+
+activitystage(JobId, OperatorId2, ActivityId2, OperatorId1, ActivityId1) :-
+    activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2)
+    {
+        java.lang.System.err.println("CHANGE1: " + JobId.toString() + " " + OperatorId2.toString() + " " + ActivityId2.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString());
+    };
+
+activitystage(JobId, OperatorId1, ActivityId1, OperatorId3, ActivityId3) :-
+    activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+    activitynode(JobId, OperatorId3, ActivityId3, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+    activityconnection(JobId, OperatorId3, Operator3Port, _, ActivityId3, _),
+    connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId3, Operator3Port, _)
+    {
+        java.lang.System.err.println("CHANGE2: " + JobId.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString() + " " + OperatorId3.toString() + " " + ActivityId3.toString());
+    };
+
+watch(activitynode, a);
+watch(activitystage, a);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 94bc57a..8505531 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -24,8 +24,8 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.config.CCConfig;
 import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
 
 public abstract class AbstractIntegrationTest {
     public static final String NC1_ID = "nc1";
@@ -39,6 +39,7 @@
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.port = 39001;
+        ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();