Added application support. Added examples. Added CLI

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@57 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/.classpath b/hyracks-control-cc/.classpath
index 88cebb7..ebe2cde 100644
--- a/hyracks-control-cc/.classpath
+++ b/hyracks-control-cc/.classpath
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" path="src/main/java"/>
+	<classpathentry kind="src" path="src/main/resources"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index 96992af..a975238 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -25,20 +25,6 @@
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
-  		<groupId>org.eclipse.jetty</groupId>
-  		<artifactId>jetty-server</artifactId>
-  		<version>8.0.0.M0</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
-  		<groupId>args4j</groupId>
-  		<artifactId>args4j</artifactId>
-  		<version>2.0.12</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
   		<groupId>org.apache.wicket</groupId>
   		<artifactId>wicket</artifactId>
   		<version>1.4.7</version>
@@ -52,5 +38,12 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>org.eclipse.jetty</groupId>
+  		<artifactId>jetty-server</artifactId>
+  		<version>8.0.0.M1</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2597df7..1d8aca2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -14,7 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.cc;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.rmi.registry.LocateRegistry;
@@ -22,6 +27,7 @@
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Hashtable;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -43,29 +49,36 @@
 import jol.core.Runtime;
 import jol.core.Runtime.DebugLevel;
 
+import org.apache.commons.io.IOUtils;
+import org.eclipse.jetty.http.HttpMethods;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.server.handler.ContextHandler;
 
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.control.IClusterController;
+import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NodeParameters;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.NodeParameters;
-import edu.uci.ics.hyracks.control.common.api.IClusterController;
-import edu.uci.ics.hyracks.control.common.api.INodeController;
-import edu.uci.ics.hyracks.control.common.comm.Endpoint;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
+        IHyracksClientInterface {
     private static final long serialVersionUID = 1L;
 
     private CCConfig ccConfig;
@@ -74,8 +87,14 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
+    private final Map<String, ApplicationContext> applications;
+
+    private final ServerContext serverCtx;
+
     private WebServer webServer;
 
+    private ClusterControllerInfo info;
+
     private final IJobManager jobManager;
 
     private final Executor taskExecutor;
@@ -87,11 +106,16 @@
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        applications = new Hashtable<String, ApplicationContext>();
+        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
+                ClusterControllerService.class.getName()));
         Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
         jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
         jobManager = new JOLJobManagerImpl(this, jolRuntime);
         taskExecutor = Executors.newCachedThreadPool();
-        webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
+        webServer = new WebServer();
+        webServer.addHandler(getAdminConsoleHandler());
+        webServer.addHandler(getApplicationInstallationHandler());
         this.timer = new Timer(true);
     }
 
@@ -103,6 +127,8 @@
         registry.rebind(IClusterController.class.getName(), this);
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
+        info = new ClusterControllerInfo();
+        info.setWebPort(webServer.getListeningPort());
         timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
@@ -115,13 +141,12 @@
     }
 
     @Override
-    public UUID createJob(JobSpecification jobSpec) throws Exception {
-        return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
-    }
-
-    @Override
-    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        return jobManager.createJob(jobSpec, jobFlags);
+    public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        ApplicationContext appCtx = applications.get(appName);
+        if (appCtx == null) {
+            throw new HyracksException("No application with id " + appName + " found");
+        }
+        return jobManager.createJob(appName, (JobSpecification) appCtx.deserialize(jobSpec), jobFlags);
     }
 
     @Override
@@ -138,6 +163,7 @@
         jobManager.registerNode(id);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
         NodeParameters params = new NodeParameters();
+        params.setClusterControllerInfo(info);
         params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
         return params;
     }
@@ -237,12 +263,54 @@
         return handler;
     }
 
-    private Handler getApplicationDataHandler() {
+    private Handler getApplicationInstallationHandler() {
         ContextHandler handler = new ContextHandler("/applications");
         handler.setHandler(new AbstractHandler() {
             @Override
             public void handle(String target, Request baseRequest, HttpServletRequest request,
                     HttpServletResponse response) throws IOException, ServletException {
+                try {
+                    while (target.startsWith("/")) {
+                        target = target.substring(1);
+                    }
+                    while (target.endsWith("/")) {
+                        target = target.substring(0, target.length() - 1);
+                    }
+                    String[] parts = target.split("/");
+                    if (parts.length != 1) {
+                        return;
+                    }
+                    String appName = parts[0];
+                    ApplicationContext appCtx;
+                    appCtx = applications.get(appName);
+                    if (appCtx != null) {
+                        if (HttpMethods.PUT.equals(request.getMethod())) {
+                            OutputStream os = appCtx.getHarOutputStream();
+                            try {
+                                IOUtils.copyLarge(request.getInputStream(), os);
+                            } finally {
+                                os.close();
+                            }
+                        } else if (HttpMethods.GET.equals(request.getMethod())) {
+                            if (!appCtx.containsHar()) {
+                                response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+                            } else {
+                                InputStream is = appCtx.getHarInputStream();
+                                response.setContentType("application/octet-stream");
+                                response.setStatus(HttpServletResponse.SC_OK);
+                                try {
+                                    IOUtils.copyLarge(is, response.getOutputStream());
+                                } finally {
+                                    is.close();
+                                }
+                            }
+                        }
+                        baseRequest.setHandled(true);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    throw e;
+                }
             }
         });
         return handler;
@@ -274,6 +342,47 @@
         jobManager.notifyNodeFailure(nodeId);
     }
 
+    @Override
+    public void createApplication(String appName) throws Exception {
+        synchronized (applications) {
+            if (applications.containsKey(appName)) {
+                throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+            }
+            ApplicationContext appCtx = new ApplicationContext(serverCtx, appName);
+            applications.put(appName, appCtx);
+        }
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        ApplicationContext appCtx = applications.remove(appName);
+        if (appCtx != null) {
+            synchronized (this) {
+                for (NodeControllerState ncs : nodeRegistry.values()) {
+                    ncs.getNodeController().destroyApplication(appName);
+                }
+            }
+            appCtx.deinitialize();
+        }
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+        ApplicationContext appCtx = applications.get(appName);
+        appCtx.initialize();
+        boolean deployHar = appCtx.containsHar();
+        synchronized (this) {
+            for (NodeControllerState ncs : nodeRegistry.values()) {
+                ncs.getNodeController().createApplication(appName, deployHar);
+            }
+        }
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        return info;
+    }
+
     private class DeadNodeSweeper extends TimerTask {
         @Override
         public void run() {
@@ -346,19 +455,28 @@
         return accumulator == null ? null : accumulator.getResult();
     }
 
+    private static byte[] serialize(Object o) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(o);
+        return baos.toByteArray();
+    }
+
     static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
         private String nodeId;
         private UUID jobId;
+        private String appName;
         private JobPlan plan;
         private UUID stageId;
         private int attempt;
         private Map<ActivityNodeId, Set<Integer>> tasks;
         private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
 
-        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+        public Phase1Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId, int attempt,
                 Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
             this.nodeId = nodeId;
             this.jobId = jobId;
+            this.appName = appName;
             this.plan = plan;
             this.stageId = stageId;
             this.attempt = attempt;
@@ -368,7 +486,7 @@
 
         @Override
         public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
-            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+            return node.initializeJobletPhase1(appName, jobId, serialize(plan), stageId, attempt, tasks, opPartitions);
         }
 
         @Override
@@ -385,17 +503,19 @@
     static class Phase2Installer implements RemoteOp<Void> {
         private String nodeId;
         private UUID jobId;
+        private String appName;
         private JobPlan plan;
         private UUID stageId;
         private Map<ActivityNodeId, Set<Integer>> tasks;
         private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
         private Map<PortInstanceId, Endpoint> globalPortMap;
 
-        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+        public Phase2Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId,
                 Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
                 Map<PortInstanceId, Endpoint> globalPortMap) {
             this.nodeId = nodeId;
             this.jobId = jobId;
+            this.appName = appName;
             this.plan = plan;
             this.stageId = stageId;
             this.tasks = tasks;
@@ -405,7 +525,7 @@
 
         @Override
         public Void execute(INodeController node) throws Exception {
-            node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+            node.initializeJobletPhase2(appName, jobId, serialize(plan), stageId, tasks, opPartitions, globalPortMap);
             return null;
         }
 
@@ -544,19 +664,4 @@
             return portMap;
         }
     }
-
-    @Override
-    public void createApplication(String appName) throws Exception {
-
-    }
-
-    @Override
-    public void destroyApplication(String appName) throws Exception {
-
-    }
-
-    @Override
-    public void startApplication(String appName) throws Exception {
-
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index 9d500c0..bb90d0b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -24,12 +24,12 @@
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public interface IJobManager {
-    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+    public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     public void start(UUID jobId) throws Exception;
 
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-        StageletStatistics statistics) throws Exception;
+            StageletStatistics statistics) throws Exception;
 
     public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index d7fd61d..22cce67 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -36,6 +36,7 @@
 import jol.types.table.Function;
 import jol.types.table.Key;
 import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
@@ -52,13 +53,12 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.control.common.comm.Endpoint;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
 
 public class JOLJobManagerImpl implements IJobManager {
     private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -194,8 +194,9 @@
                                 UUID jobId = (UUID) data[0];
                                 UUID stageId = (UUID) data[1];
                                 Integer attempt = (Integer) data[2];
-                                JobPlan plan = (JobPlan) data[3];
-                                Set<List> ts = (Set<List>) data[4];
+                                String appName = (String) data[3];
+                                JobPlan plan = (JobPlan) data[4];
+                                Set<List> ts = (Set<List>) data[5];
                                 Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
                                 for (List t2 : ts) {
                                     Object[] t2Data = t2.toArray();
@@ -229,7 +230,7 @@
                                         aParts.add((Integer) lData[1]);
                                     }
                                     p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
-                                            plan, stageId, attempt, tasks, opPartitions);
+                                            appName, plan, stageId, attempt, tasks, opPartitions);
                                 }
                                 LOGGER.info("Stage start - Phase 1");
                                 Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
@@ -257,7 +258,7 @@
                                         aParts.add((Integer) lData[1]);
                                     }
                                     p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
-                                            plan, stageId, tasks, opPartitions, globalPortMap);
+                                            appName, plan, stageId, tasks, opPartitions, globalPortMap);
                                     p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
                                             stageId);
                                     ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
@@ -368,7 +369,7 @@
     }
 
     @Override
-    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
         final UUID jobId = UUID.randomUUID();
 
         final JobPlanBuilder builder = new JobPlanBuilder();
@@ -420,7 +421,8 @@
             cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
         }
 
-        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, appName, jobSpec,
+                builder.getPlan()));
 
         jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
         jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
@@ -481,7 +483,7 @@
                 if (jobTuple == null) {
                     return null;
                 }
-                return (JobStatus) jobTuple.value(1);
+                return (JobStatus) jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX);
             } catch (BadKeyException e) {
                 throw new RuntimeException(e);
             }
@@ -583,7 +585,8 @@
     public JobStatistics waitForCompletion(UUID jobId) throws Exception {
         synchronized (jobTable) {
             Tuple jobTuple = null;
-            while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+            while ((jobTuple = jobTable.lookupJob(jobId)) != null
+                    && jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX) != JobStatus.TERMINATED) {
                 jobTable.wait();
             }
             return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
@@ -599,21 +602,29 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
-                JobPlan.class, Set.class };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, JobStatus.class,
+                JobSpecification.class, JobPlan.class, Set.class };
+
+        public static final int JOBID_FIELD_INDEX = 0;
+        public static final int APPNAME_FIELD_INDEX = 1;
+        public static final int JOBSTATUS_FIELD_INDEX = 2;
+        public static final int JOBSPEC_FIELD_INDEX = 3;
+        public static final int JOBPLAN_FIELD_INDEX = 4;
+        public static final int STATISTICS_FIELD_INDEX = 5;
 
         public JobTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
         @SuppressWarnings("unchecked")
-        static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
-            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+        static Tuple createInitialJobTuple(UUID jobId, String appName, JobSpecification jobSpec, JobPlan plan) {
+            return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
         }
 
         @SuppressWarnings("unchecked")
         JobStatistics buildJobStatistics(Tuple jobTuple) {
-            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple
+                    .value(JobTable.STATISTICS_FIELD_INDEX);
             JobStatistics stats = new JobStatistics();
             if (statsSet != null) {
                 for (Set<StageletStatistics> stageStatsSet : statsSet) {
@@ -822,8 +833,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
-                Set.class };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, String.class,
+                JobPlan.class, Set.class };
 
         public StartMessageTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1010,6 +1021,11 @@
         private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
                 Integer.class };
 
+        public static final int JOBID_FIELD_INDEX = 0;
+        public static final int ODID_FIELD_INDEX = 1;
+        public static final int NPARTITIONS_FIELD_INDEX = 2;
+        public static final int ORDINAL_FIELD_INDEX = 3;
+
         public ExpandPartitionCountConstraintTableFunction() {
             super(TABLE_NAME, SCHEMA);
         }
@@ -1019,9 +1035,9 @@
             TupleSet result = new BasicTupleSet();
             int counter = 0;
             for (Tuple t : tuples) {
-                int nPartitions = (Integer) t.value(2);
+                int nPartitions = (Integer) t.value(NPARTITIONS_FIELD_INDEX);
                 for (int i = 0; i < nPartitions; ++i) {
-                    result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+                    result.add(new Tuple(t.value(JOBID_FIELD_INDEX), t.value(ODID_FIELD_INDEX), i, counter++));
                 }
             }
             return result;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
index fcb796e..d5056d3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
@@ -12,8 +12,8 @@
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
 
 public class JobPlanBuilder implements IActivityGraphBuilder {
     private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
index 956c658..af3b77f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
@@ -29,12 +29,12 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStage;
 import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
-import edu.uci.ics.hyracks.control.common.job.JobStage;
 
 public class JobPlanner {
     private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e4d611b..a638c09 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -1,6 +1,6 @@
 package edu.uci.ics.hyracks.control.cc;
 
-import edu.uci.ics.hyracks.control.common.api.INodeController;
+import edu.uci.ics.hyracks.api.control.INodeController;
 
 public class NodeControllerState {
     private final INodeController nodeController;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index dfffcb3..12dfe21 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -18,28 +18,33 @@
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 
 public class WebServer {
-    private Server server;
-    private SelectChannelConnector connector;
+    private final Server server;
+    private final SelectChannelConnector connector;
+    private final HandlerCollection handlerCollection;
 
-    public WebServer(Handler[] handlers) throws Exception {
+    public WebServer() throws Exception {
         server = new Server();
 
         connector = new SelectChannelConnector();
 
         server.setConnectors(new Connector[] { connector });
 
-        ContextHandlerCollection handler = new ContextHandlerCollection();
-        handler.setHandlers(handlers);
-        server.setHandler(handler);
+        handlerCollection = new ContextHandlerCollection();
+        server.setHandler(handlerCollection);
     }
 
     public void setPort(int port) {
         connector.setPort(port);
     }
 
+    public int getListeningPort() {
+        return connector.getLocalPort();
+    }
+
     public void start() throws Exception {
         server.start();
     }
@@ -47,4 +52,8 @@
     public void stop() throws Exception {
         server.stop();
     }
+
+    public void addHandler(Handler handler) {
+        handlerCollection.addHandler(handler);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
new file mode 100644
index 0000000..0be0401
--- /dev/null
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -0,0 +1,314 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+
+watch(job, a);
+watch(job, i);
+watch(job, d);
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+    activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+    StageNumber2 <= StageNumber1
+    {
+        StageNumber := StageNumber1 + 1;
+    };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+    activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+    activitystage(JobId, _, _, StageNumber)
+    {
+        StageId := java.util.UUID.randomUUID();
+    };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+    job(JobId, _, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+    jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+    jobattempt(JobId, Attempt),
+    stagestart(JobId, _, Attempt),
+    abortcomplete(JobId, _, Attempt)
+    {
+        NextAttempt := Attempt + 1;
+    };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+    jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+    jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+    stagestart(JobId, StageNumber, Attempt),
+    stagefinish#insert(StageId, StageNumber, Attempt, _)
+    {
+        NextStageNumber := StageNumber + 1;
+    };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+    operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+    availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+    availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+    operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+    maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+    rankedavailablenodes(NodeId, NodeRank),
+    availablenodecount(_, NodeCount),
+    NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+    operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+    expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    operatordescriptor(JobId, OperatorId, _, _),
+    activitystage(JobId, OperatorId, ActivityId, StageNumber),
+    jobstage(JobId, StageNumber, StageId),
+    attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 4, 5), {UUID, UUID, String, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, AppName, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+    {
+        ActivityInfo := [ActivityId, Partition];
+    };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, String, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, set<Tuple>) :-
+    stageletstart#insert(JobId, StageId, AppName, JobPlan, NodeId, Attempt, ActivityInfoSet),
+    availablenodes(NodeId),
+    ActivityInfoSet.size() != 0
+    {
+        Tuple := [NodeId, ActivityInfoSet];
+    };
+
+startmessage(JobId, StageId, Attempt, AppName, JobPlan, TSet) :-
+    startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+    stageletfailure(JobId, StageId, NodeId, Attempt),
+    stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, _),
+    stageletstart(JobId, StageId, _, _, NodeIdOther, Attempt, ActivityIdSet),
+    failednodes#insert(NodeId),
+    notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    availablenodes(NodeId)
+    {
+        Tuple := [NodeId, ActivityIdSet];
+    };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+    TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+    notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+    stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+    stageabort(JobId, StageId, Attempt, NodeIdSet2),
+    NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+    stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+    startmessage_agg(JobId, StageId, Attempt, _, _, TSet),
+    stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+    jobstage(JobId, StageNumber, StageId),
+    TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+    job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, SSet),
+    notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, _),
+    attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+    notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+    jobcleanup_agg(JobId, NodeIdSet);