Refactored into hyracks-control-cc
git-svn-id: https://hyracks.googlecode.com/svn/trunk@52 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-cc/.classpath b/hyracks/hyracks-control-cc/.classpath
new file mode 100644
index 0000000..88cebb7
--- /dev/null
+++ b/hyracks/hyracks-control-cc/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks/hyracks-control-cc/.project b/hyracks/hyracks-control-cc/.project
new file mode 100644
index 0000000..271840b
--- /dev/null
+++ b/hyracks/hyracks-control-cc/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>hyracks-control-cc</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..1b67f41
--- /dev/null
+++ b/hyracks/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Fri Jul 30 17:52:26 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-control-cc/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-control-cc/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..d783847
--- /dev/null
+++ b/hyracks/hyracks-control-cc/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Fri Jul 30 17:52:26 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks/hyracks-control-cc/pom.xml b/hyracks/hyracks-control-cc/pom.xml
new file mode 100644
index 0000000..96992af
--- /dev/null
+++ b/hyracks/hyracks-control-cc/pom.xml
@@ -0,0 +1,56 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.1.0</version>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.12</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wicket</groupId>
+ <artifactId>wicket</artifactId>
+ <version>1.4.7</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>jol</groupId>
+ <artifactId>jol</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
new file mode 100644
index 0000000..c7597ee
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import org.kohsuke.args4j.Option;
+
+public class CCConfig {
+ @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+ public int port = 1099;
+
+ @Option(name = "-http-port", usage = "Sets the http port for the admin console")
+ public int httpPort;
+
+ @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+ public int heartbeatPeriod = 10000;
+
+ @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+ public int maxHeartbeatLapsePeriods = 5;
+
+ @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+ public boolean useJOL = false;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
new file mode 100644
index 0000000..8104362
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+
+public class CCDriver {
+ public static void main(String args[]) throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ CmdLineParser cp = new CmdLineParser(ccConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+ ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+ ccService.start();
+ while (true) {
+ Thread.sleep(100000);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
new file mode 100644
index 0000000..2597df7
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -0,0 +1,562 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.cc.web.WebServer;
+import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
+import edu.uci.ics.hyracks.control.common.NodeParameters;
+import edu.uci.ics.hyracks.control.common.api.IClusterController;
+import edu.uci.ics.hyracks.control.common.api.INodeController;
+import edu.uci.ics.hyracks.control.common.comm.Endpoint;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+ private static final long serialVersionUID = 1L;
+
+ private CCConfig ccConfig;
+
+ private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+
+ private final Map<String, NodeControllerState> nodeRegistry;
+
+ private WebServer webServer;
+
+ private final IJobManager jobManager;
+
+ private final Executor taskExecutor;
+
+ private final Timer timer;
+
+ private Runtime jolRuntime;
+
+ public ClusterControllerService(CCConfig ccConfig) throws Exception {
+ this.ccConfig = ccConfig;
+ nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+ jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+ jobManager = new JOLJobManagerImpl(this, jolRuntime);
+ taskExecutor = Executors.newCachedThreadPool();
+ webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
+ this.timer = new Timer(true);
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting ClusterControllerService");
+ Registry registry = LocateRegistry.createRegistry(ccConfig.port);
+ registry.rebind(IHyracksClientInterface.class.getName(), this);
+ registry.rebind(IClusterController.class.getName(), this);
+ webServer.setPort(ccConfig.httpPort);
+ webServer.start();
+ timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+ LOGGER.log(Level.INFO, "Started ClusterControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+ webServer.stop();
+ LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec) throws Exception {
+ return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ return jobManager.createJob(jobSpec, jobFlags);
+ }
+
+ @Override
+ public NodeParameters registerNode(INodeController nodeController) throws Exception {
+ String id = nodeController.getId();
+ NodeControllerState state = new NodeControllerState(nodeController);
+ synchronized (this) {
+ if (nodeRegistry.containsKey(id)) {
+ throw new Exception("Node with this name already registered.");
+ }
+ nodeRegistry.put(id, state);
+ }
+ nodeController.notifyRegistration(this);
+ jobManager.registerNode(id);
+ LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+ NodeParameters params = new NodeParameters();
+ params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+ return params;
+ }
+
+ @Override
+ public void unregisterNode(INodeController nodeController) throws Exception {
+ String id = nodeController.getId();
+ synchronized (this) {
+ nodeRegistry.remove(id);
+ }
+ LOGGER.log(Level.INFO, "Unregistered INodeController");
+ }
+
+ public synchronized NodeControllerState lookupNode(String id) throws Exception {
+ return nodeRegistry.get(id);
+ }
+
+ public Executor getExecutor() {
+ return taskExecutor;
+ }
+
+ public synchronized void notifyJobComplete(final UUID jobId) {
+ for (final NodeControllerState ns : nodeRegistry.values()) {
+ taskExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ns.getNodeController().cleanUpJob(jobId);
+ } catch (Exception e) {
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
+ }
+
+ @Override
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+ jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+ }
+
+ @Override
+ public JobStatus getJobStatus(UUID jobId) throws Exception {
+ return jobManager.getJobStatus(jobId);
+ }
+
+ @Override
+ public void start(UUID jobId) throws Exception {
+ jobManager.start(jobId);
+ }
+
+ @Override
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ return jobManager.waitForCompletion(jobId);
+ }
+
+ private Handler getAdminConsoleHandler() {
+ ContextHandler handler = new ContextHandler("/admin");
+ handler.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ if (!"/".equals(target)) {
+ return;
+ }
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+ PrintWriter writer = response.getWriter();
+ writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
+ writer.println("<h1>Hyracks Admin Console</h1>");
+ writer.println("<h2>Node Controllers</h2>");
+ writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
+ synchronized (ClusterControllerService.this) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ try {
+ writer.print("<tr><td>");
+ writer.print(e.getKey());
+ writer.print("</td><td>");
+ writer.print(e.getValue().getLastHeartbeatDuration());
+ writer.print("</td></tr>");
+ } catch (Exception ex) {
+ }
+ }
+ }
+ writer.println("</table>");
+ writer.println("</body></html>");
+ writer.flush();
+ }
+ });
+ return handler;
+ }
+
+ private Handler getApplicationDataHandler() {
+ ContextHandler handler = new ContextHandler("/applications");
+ handler.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ }
+ });
+ return handler;
+ }
+
+ @Override
+ public Map<String, InetAddress[]> getRegistry() throws Exception {
+ Map<String, INodeController> map = new HashMap<String, INodeController>();
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ map.put(e.getKey(), e.getValue().getNodeController());
+ }
+ // return map;
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void nodeHeartbeat(String id) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Heartbeat from: " + id);
+ }
+ NodeControllerState ns = nodeRegistry.get(id);
+ if (ns != null) {
+ ns.notifyHeartbeat();
+ }
+ }
+
+ private void killNode(String nodeId) throws Exception {
+ nodeRegistry.remove(nodeId);
+ jobManager.notifyNodeFailure(nodeId);
+ }
+
+ private class DeadNodeSweeper extends TimerTask {
+ @Override
+ public void run() {
+ Set<String> deadNodes = new HashSet<String>();
+ synchronized (ClusterControllerService.this) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ NodeControllerState state = e.getValue();
+ if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+ deadNodes.add(e.getKey());
+ }
+ }
+ for (String deadNode : deadNodes) {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Killing node: " + deadNode);
+ }
+ killNode(deadNode);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ interface RemoteOp<T> {
+ public String getNodeId();
+
+ public T execute(INodeController node) throws Exception;
+ }
+
+ interface Accumulator<T, R> {
+ public void accumulate(T o);
+
+ public R getResult();
+ }
+
+ <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+ final Semaphore installComplete = new Semaphore(remoteOps.length);
+ final List<Exception> errors = new Vector<Exception>();
+ for (final RemoteOp<T> remoteOp : remoteOps) {
+ NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+ final INodeController node = nodeState.getNodeController();
+
+ installComplete.acquire();
+ Runnable remoteRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ T t = remoteOp.execute(node);
+ if (accumulator != null) {
+ synchronized (accumulator) {
+ accumulator.accumulate(t);
+ }
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ installComplete.release();
+ }
+ }
+ };
+
+ getExecutor().execute(remoteRunner);
+ }
+ installComplete.acquire(remoteOps.length);
+ if (!errors.isEmpty()) {
+ throw errors.get(0);
+ }
+ return accumulator == null ? null : accumulator.getResult();
+ }
+
+ static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private int attempt;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+
+ public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.attempt = attempt;
+ this.tasks = tasks;
+ this.opPartitions = opPartitions;
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+ return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 1";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase2Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+ private Map<PortInstanceId, Endpoint> globalPortMap;
+
+ public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ Map<PortInstanceId, Endpoint> globalPortMap) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.tasks = tasks;
+ this.opPartitions = opPartitions;
+ this.globalPortMap = globalPortMap;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 2";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase3Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.commitJobletInitialization(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 3";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class StageStarter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.startStage(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Started Stage: " + stageId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class JobletAborter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.abortJoblet(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Aborting";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class JobCompleteNotifier implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+
+ public JobCompleteNotifier(String nodeId, UUID jobId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.cleanUpJob(jobId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Cleaning Up";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class PortMapMergingAccumulator implements
+ Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+ Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+ @Override
+ public void accumulate(Map<PortInstanceId, Endpoint> o) {
+ portMap.putAll(o);
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> getResult() {
+ return portMap;
+ }
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
new file mode 100644
index 0000000..9d500c0
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public void start(UUID jobId) throws Exception;
+
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception;
+
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+ public JobStatus getJobStatus(UUID jobId);
+
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+ public void notifyNodeFailure(String nodeId) throws Exception;
+
+ public void registerNode(String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
new file mode 100644
index 0000000..d7fd61d
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -0,0 +1,1052 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
+import jol.types.table.BasicTable;
+import jol.types.table.EventTable;
+import jol.types.table.Function;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.common.comm.Endpoint;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+
+public class JOLJobManagerImpl implements IJobManager {
+ private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
+ public static final String JOL_SCOPE = "hyrackscc";
+
+ private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/control/cc/scheduler.olg";
+
+ private final Runtime jolRuntime;
+
+ private final LinkedBlockingQueue<Runnable> jobQueue;
+
+ private final JobTable jobTable;
+
+ private final JobQueueThread jobQueueThread;
+
+ private final OperatorDescriptorTable odTable;
+
+ private final OperatorLocationTable olTable;
+
+ private final OperatorCloneCountTable ocTable;
+
+ private final ConnectorDescriptorTable cdTable;
+
+ private final ActivityNodeTable anTable;
+
+ private final ActivityConnectionTable acTable;
+
+ private final ActivityBlockedTable abTable;
+
+ private final JobStartTable jobStartTable;
+
+ private final JobCleanUpTable jobCleanUpTable;
+
+ private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
+ private final StartMessageTable startMessageTable;
+
+ private final StageletCompleteTable stageletCompleteTable;
+
+ private final StageletFailureTable stageletFailureTable;
+
+ private final AvailableNodesTable availableNodesTable;
+
+ private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
+ private final FailedNodesTable failedNodesTable;
+
+ private final AbortMessageTable abortMessageTable;
+
+ private final AbortNotifyTable abortNotifyTable;
+
+ private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+ private final List<String> rankedAvailableNodes;
+
+ public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
+ this.jolRuntime = jolRuntime;
+ jobQueue = new LinkedBlockingQueue<Runnable>();
+ jobQueueThread = new JobQueueThread();
+ jobQueueThread.start();
+
+ this.jobTable = new JobTable(jolRuntime);
+ this.odTable = new OperatorDescriptorTable(jolRuntime);
+ this.olTable = new OperatorLocationTable(jolRuntime);
+ this.ocTable = new OperatorCloneCountTable(jolRuntime);
+ this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+ this.anTable = new ActivityNodeTable(jolRuntime);
+ this.acTable = new ActivityConnectionTable(jolRuntime);
+ this.abTable = new ActivityBlockedTable(jolRuntime);
+ this.jobStartTable = new JobStartTable();
+ this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+ this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
+ this.startMessageTable = new StartMessageTable(jolRuntime);
+ this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+ this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+ this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+ this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
+ this.failedNodesTable = new FailedNodesTable(jolRuntime);
+ this.abortMessageTable = new AbortMessageTable(jolRuntime);
+ this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+ this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+ this.rankedAvailableNodes = new ArrayList<String>();
+
+ jolRuntime.catalog().register(jobTable);
+ jolRuntime.catalog().register(odTable);
+ jolRuntime.catalog().register(olTable);
+ jolRuntime.catalog().register(ocTable);
+ jolRuntime.catalog().register(cdTable);
+ jolRuntime.catalog().register(anTable);
+ jolRuntime.catalog().register(acTable);
+ jolRuntime.catalog().register(abTable);
+ jolRuntime.catalog().register(jobStartTable);
+ jolRuntime.catalog().register(jobCleanUpTable);
+ jolRuntime.catalog().register(jobCleanUpCompleteTable);
+ jolRuntime.catalog().register(startMessageTable);
+ jolRuntime.catalog().register(stageletCompleteTable);
+ jolRuntime.catalog().register(stageletFailureTable);
+ jolRuntime.catalog().register(availableNodesTable);
+ jolRuntime.catalog().register(rankedAvailableNodesTable);
+ jolRuntime.catalog().register(failedNodesTable);
+ jolRuntime.catalog().register(abortMessageTable);
+ jolRuntime.catalog().register(abortNotifyTable);
+ jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+
+ jobTable.register(new JobTable.Callback() {
+ @Override
+ public void deletion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+
+ @Override
+ public void insertion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+ });
+
+ startMessageTable.register(new StartMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ JobPlan plan = (JobPlan) data[3];
+ Set<List> ts = (Set<List>) data[4];
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+ if (opParts == null) {
+ opParts = new HashSet<Integer>();
+ opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+ }
+ opParts.add((Integer) lData[1]);
+ }
+ }
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+ .size()];
+ int i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+ plan, stageId, attempt, tasks, opPartitions);
+ }
+ LOGGER.info("Stage start - Phase 1");
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+ .size()];
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+ .size()];
+ ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+ .size()];
+ i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+ plan, stageId, tasks, opPartitions, globalPortMap);
+ p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+ stageId);
+ ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+ stageId);
+ ++i;
+ }
+ LOGGER.info("Stage start - Phase 2");
+ ccs.runRemote(p2is, null);
+ LOGGER.info("Stage start - Phase 3");
+ ccs.runRemote(p3is, null);
+ LOGGER.info("Stage start");
+ ccs.runRemote(ss, null);
+ LOGGER.info("Stage started");
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ Set<String> ts = (Set<String>) data[1];
+ ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+ .size()];
+ int i = 0;
+ for (String n : ts) {
+ jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+ }
+ try {
+ ccs.runRemote(jcns, null);
+ } finally {
+ BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+ .createTuple(jobId));
+ jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ abortMessageTable.register(new AbortMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ Set<List> ts = (Set<List>) data[4];
+ ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+ .size()];
+ int i = 0;
+ BasicTupleSet notificationTuples = new BasicTupleSet();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ String nodeId = (String) t2Data[0];
+ jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+ attempt);
+ notificationTuples.add(AbortNotifyTable
+ .createTuple(jobId, stageId, nodeId, attempt));
+ }
+ try {
+ ccs.runRemote(jas, null);
+ } finally {
+ jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+ null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final UUID jobId = UUID.randomUUID();
+
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
+
+ final BasicTupleSet anTuples = new BasicTupleSet();
+ final BasicTupleSet acTuples = new BasicTupleSet();
+ final BasicTupleSet abTuples = new BasicTupleSet();
+ IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+ @Override
+ public void addTask(IActivityNode task) {
+ anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+ builder.addTask(task);
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+ taskOutputIndex));
+ builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+ taskInputIndex));
+ builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
+ }
+
+ @Override
+ public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+ abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+ builder.addBlockingEdge(blocker, blocked);
+ }
+ };
+
+ BasicTupleSet odTuples = new BasicTupleSet();
+ BasicTupleSet olTuples = new BasicTupleSet();
+ BasicTupleSet ocTuples = new BasicTupleSet();
+ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+ IOperatorDescriptor od = e.getValue();
+ int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
+ odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
+ od.contributeTaskGraph(gBuilder);
+ }
+
+ BasicTupleSet cdTuples = new BasicTupleSet();
+ for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+ cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+ }
+
+ BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
+ jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+ jolRuntime.evaluate();
+
+ return jobId;
+ }
+
+ private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+ BasicTupleSet ocTuples) {
+ PartitionConstraint pc = od.getPartitionConstraint();
+
+ switch (pc.getPartitionConstraintType()) {
+ case COUNT:
+ int count = ((PartitionCountConstraint) pc).getCount();
+ ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+ return count;
+
+ case EXPLICIT:
+ LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+ for (int i = 0; i < locationConstraints.length; ++i) {
+ addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+ }
+ return locationConstraints.length;
+ }
+ throw new IllegalArgumentException();
+ }
+
+ private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+ LocationConstraint locationConstraint, int benefit) {
+ switch (locationConstraint.getConstraintType()) {
+ case ABSOLUTE:
+ String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+ olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
+ break;
+
+ case CHOICE:
+ int index = 0;
+ for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+ addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+ index++;
+ }
+ }
+ }
+
+ @Override
+ public JobStatus getJobStatus(UUID jobId) {
+ synchronized (jobTable) {
+ try {
+ Tuple jobTuple = jobTable.lookupJob(jobId);
+ if (jobTuple == null) {
+ return null;
+ }
+ return (JobStatus) jobTuple.value(1);
+ } catch (BadKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+ int len = rankedAvailableNodes.size();
+ int delIndex = -1;
+ for (int i = 0; i < len; ++i) {
+ if (nodeId.equals(rankedAvailableNodes.get(i))) {
+ delIndex = i;
+ break;
+ }
+ }
+ if (delIndex < 0) {
+ return;
+ }
+ BasicTupleSet delRANTuples = new BasicTupleSet();
+ delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ for (int i = delIndex + 1; i < len; ++i) {
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+ }
+
+ rankedAvailableNodes.remove(delIndex);
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
+ BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+ jolRuntime.evaluate();
+
+ BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ BasicTupleSet scTuples = new BasicTupleSet();
+ scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+ throws Exception {
+ BasicTupleSet sfTuples = new BasicTupleSet();
+ sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public void start(UUID jobId) throws Exception {
+ BasicTupleSet jsTuples = new BasicTupleSet();
+ jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
+
+ jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public synchronized void registerNode(String nodeId) throws Exception {
+ rankedAvailableNodes.add(nodeId);
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
+ BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+ jolRuntime.evaluate();
+
+ BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ synchronized (jobTable) {
+ Tuple jobTuple = null;
+ while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+ jobTable.wait();
+ }
+ return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+ }
+ }
+
+ /*
+ * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
+ */
+ private static class JobTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
+ JobPlan.class, Set.class };
+
+ public JobTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ @SuppressWarnings("unchecked")
+ static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+ return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+ }
+
+ @SuppressWarnings("unchecked")
+ JobStatistics buildJobStatistics(Tuple jobTuple) {
+ Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+ JobStatistics stats = new JobStatistics();
+ if (statsSet != null) {
+ for (Set<StageletStatistics> stageStatsSet : statsSet) {
+ StageStatistics stageStats = new StageStatistics();
+ for (StageletStatistics stageletStats : stageStatsSet) {
+ stageStats.addStageletStatistics(stageletStats);
+ }
+ stats.addStageStatistics(stageStats);
+ }
+ }
+ return stats;
+ }
+
+ Tuple lookupJob(UUID jobId) throws BadKeyException {
+ TupleSet set = primary().lookupByKey(jobId);
+ if (set.isEmpty()) {
+ return null;
+ }
+ return (Tuple) set.toArray()[0];
+ }
+ }
+
+ /*
+ * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+ */
+ private static class OperatorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ IOperatorDescriptor.class };
+
+ public OperatorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+ return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
+ }
+ }
+
+ /*
+ * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
+ */
+ private static class OperatorLocationTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, String.class,
+ Integer.class, Integer.class };
+
+ public OperatorLocationTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+ return new Tuple(jobId, opId, nodeId, partition, benefit);
+ }
+ }
+
+ /*
+ * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+ */
+ private static class OperatorCloneCountTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class };
+
+ public OperatorCloneCountTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+ return new Tuple(jobId, opId, cloneCount);
+ }
+ }
+
+ /*
+ * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+ */
+ private static class ConnectorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, ConnectorDescriptorId.class,
+ OperatorDescriptorId.class, Integer.class, OperatorDescriptorId.class, Integer.class,
+ IConnectorDescriptor.class };
+
+ public ConnectorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+ IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+ int srcPort = jobSpec.getProducerOutputIndex(conn);
+ IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+ int destPort = jobSpec.getConsumerInputIndex(conn);
+ Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort,
+ destOD.getOperatorId(), destPort, conn);
+ return cdTuple;
+ }
+ }
+
+ /*
+ * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
+ */
+ private static class ActivityNodeTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+ ActivityNodeId.class, IActivityNode.class };
+
+ public ActivityNodeTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+ aNode);
+ }
+ }
+
+ /*
+ * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+ */
+ private static class ActivityConnectionTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ Direction.class, ActivityNodeId.class, Integer.class };
+
+ public ActivityConnectionTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction,
+ aNode.getActivityNodeId(), activityPort);
+ }
+ }
+
+ /*
+ * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+ */
+ private static class ActivityBlockedTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+ ActivityNodeId.class, OperatorDescriptorId.class, ActivityNodeId.class };
+
+ public ActivityBlockedTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+ ActivityNodeId blockerANId = blocker.getActivityNodeId();
+ OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+ ActivityNodeId blockedANId = blocked.getActivityNodeId();
+ OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+ return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+ }
+ }
+
+ /*
+ * declare(jobstart, keys(0), {JobId, SubmitTime})
+ */
+ private static class JobStartTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, Long.class };
+
+ public JobStartTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, long submitTime) {
+ return new Tuple(jobId, submitTime);
+ }
+ }
+
+ /*
+ * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+ */
+ private static class StartMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+ Set.class };
+
+ public StartMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+ */
+ private static class JobCleanUpTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
+
+ public JobCleanUpTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(jobcleanupcomplete, keys(0), {JobId})
+ */
+ private static class JobCleanUpCompleteTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class };
+
+ public JobCleanUpCompleteTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId) {
+ return new Tuple(jobId);
+ }
+ }
+
+ /*
+ * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ */
+ private static class StageletCompleteTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
+ StageletStatistics.class };
+
+ public StageletCompleteTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+ StageletStatistics statistics) {
+ return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+ }
+ }
+
+ /*
+ * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+ */
+ private static class StageletFailureTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
+
+ public StageletFailureTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
+ }
+ }
+
+ /*
+ * declare(availablenodes, keys(0), {NodeId})
+ */
+ private static class AvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { String.class };
+
+ public AvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
+ * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+ */
+ private static class RankedAvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { String.class, Integer.class };
+
+ public RankedAvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId, int rank) {
+ return new Tuple(nodeId, rank);
+ }
+ }
+
+ /*
+ * declare(failednodes, keys(0), {NodeId})
+ */
+ private static class FailedNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { String.class };
+
+ public FailedNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
+ * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+ */
+ private static class AbortMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+ Set.class };
+
+ public AbortMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ */
+ private static class AbortNotifyTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
+
+ public AbortNotifyTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
+ }
+ }
+
+ private static class ExpandPartitionCountConstraintTableFunction extends Function {
+ private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ Integer.class };
+
+ public ExpandPartitionCountConstraintTableFunction() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ @Override
+ public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+ TupleSet result = new BasicTupleSet();
+ int counter = 0;
+ for (Tuple t : tuples) {
+ int nPartitions = (Integer) t.value(2);
+ for (int i = 0; i < nPartitions; ++i) {
+ result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+ }
+ }
+ return result;
+ }
+ }
+
+ private class JobQueueThread extends Thread {
+ public JobQueueThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ Runnable r;
+ while (true) {
+ try {
+ r = jobQueue.take();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ try {
+ r.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
new file mode 100644
index 0000000..fcb796e
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+
+public class JobPlanBuilder implements IActivityGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
+
+ private JobPlan plan;
+
+ @Override
+ public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+ addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
+ addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
+ + task.getActivityNodeId() + ":" + taskInputIndex);
+ }
+ insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
+ insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
+ .getActivityNodeId());
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
+ + task.getActivityNodeId() + ":" + taskOutputIndex);
+ }
+ insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
+ insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
+ .getActivityNodeId());
+ }
+
+ @Override
+ public void addTask(IActivityNode task) {
+ plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
+ addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+ }
+
+ private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+ Set<V> targets = map.get(n1);
+ if (targets == null) {
+ targets = new HashSet<V>();
+ map.put(n1, targets);
+ }
+ targets.add(n2);
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ plan = new JobPlan(jobSpec, jobFlags);
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public JobPlan getPlan() {
+ return plan;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
new file mode 100644
index 0000000..956c658
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+import edu.uci.ics.hyracks.control.common.job.JobStage;
+
+public class JobPlanner {
+ private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
+
+ private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
+ Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
+ for (JobStage eqSet : eqSets) {
+ for (ActivityNodeId t : eqSet.getTasks()) {
+ IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+ List<Integer> inputList = plan.getTaskInputMap().get(t);
+ if (inputList != null) {
+ for (Integer idx : inputList) {
+ IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+ OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+ int producerOutputIndex = spec.getProducerOutputIndex(conn);
+ ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+ if (!eqSet.getTasks().contains(inTask)) {
+ return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+ }
+ }
+ }
+ List<Integer> outputList = plan.getTaskOutputMap().get(t);
+ if (outputList != null) {
+ for (Integer idx : outputList) {
+ IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+ OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+ int consumerInputIndex = spec.getConsumerInputIndex(conn);
+ ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+ if (!eqSet.getTasks().contains(outTask)) {
+ return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private JobStage inferStages(JobPlan plan) throws Exception {
+ JobSpecification spec = plan.getJobSpecification();
+
+ /*
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+ */
+ Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
+ Set<JobStage> stages = new HashSet<JobStage>();
+ for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
+ for (ActivityNodeId taskId : taskIds) {
+ Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+ eqSet.add(taskId);
+ JobStage stage = new JobStage(eqSet);
+ stageMap.put(taskId, stage);
+ stages.add(stage);
+ }
+ }
+
+ boolean changed = true;
+ while (changed) {
+ changed = false;
+ Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
+ if (pair != null) {
+ merge(stageMap, stages, pair.first, pair.second);
+ changed = true;
+ }
+ }
+
+ JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
+ Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
+ for (JobStage s : stages) {
+ endStage.addDependency(s);
+ s.addDependent(endStage);
+ Set<JobStage> blockedStages = new HashSet<JobStage>();
+ for (ActivityNodeId t : s.getTasks()) {
+ Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+ if (blockedTasks != null) {
+ for (ActivityNodeId bt : blockedTasks) {
+ blockedStages.add(stageMap.get(bt));
+ }
+ }
+ }
+ for (JobStage bs : blockedStages) {
+ bs.addDependency(s);
+ s.addDependent(bs);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+ for (JobStage s : stages) {
+ LOGGER.info(s.toString());
+ }
+ LOGGER.info("SID: ENDSTAGE");
+ }
+ return endStage;
+ }
+
+ private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
+ ActivityNodeId t2) {
+ JobStage stage1 = eqSetMap.get(t1);
+ Set<ActivityNodeId> s1 = stage1.getTasks();
+ JobStage stage2 = eqSetMap.get(t2);
+ Set<ActivityNodeId> s2 = stage2.getTasks();
+
+ Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+ mergedSet.addAll(s1);
+ mergedSet.addAll(s2);
+
+ eqSets.remove(stage1);
+ eqSets.remove(stage2);
+ JobStage mergedStage = new JobStage(mergedSet);
+ eqSets.add(mergedStage);
+
+ for (ActivityNodeId t : mergedSet) {
+ eqSetMap.put(t, mergedStage);
+ }
+ }
+
+ public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
+ PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) throws Exception {
+ op.contributeTaskGraph(builder);
+ }
+ });
+ JobPlan plan = builder.getPlan();
+ JobStage endStage = inferStages(plan);
+ plan.setEndStage(endStage);
+
+ return plan;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
new file mode 100644
index 0000000..e4d611b
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.control.cc;
+
+import edu.uci.ics.hyracks.control.common.api.INodeController;
+
+public class NodeControllerState {
+ private final INodeController nodeController;
+
+ private int lastHeartbeatDuration;
+
+ public NodeControllerState(INodeController nodeController) {
+ this.nodeController = nodeController;
+ }
+
+ void notifyHeartbeat() {
+ lastHeartbeatDuration = 0;
+ }
+
+ int incrementLastHeartbeatDuration() {
+ return lastHeartbeatDuration++;
+ }
+
+ int getLastHeartbeatDuration() {
+ return lastHeartbeatDuration;
+ }
+
+ public INodeController getNodeController() {
+ return nodeController;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
new file mode 100644
index 0000000..438e235
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+
+public class StageProgress {
+ private final UUID stageId;
+
+ private final Set<String> pendingNodes;
+
+ private final StageStatistics stageStatistics;
+
+ public StageProgress(UUID stageId) {
+ this.stageId = stageId;
+ pendingNodes = new HashSet<String>();
+ stageStatistics = new StageStatistics();
+ stageStatistics.setStageId(stageId);
+ }
+
+ public UUID getStageId() {
+ return stageId;
+ }
+
+ public void addPendingNodes(Set<String> nodes) {
+ pendingNodes.addAll(nodes);
+ }
+
+ public void markNodeComplete(String nodeId) {
+ pendingNodes.remove(nodeId);
+ }
+
+ public boolean stageComplete() {
+ return pendingNodes.isEmpty();
+ }
+
+ public StageStatistics getStageStatistics() {
+ return stageStatistics;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
new file mode 100644
index 0000000..65d8771
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+public interface IOperatorDescriptorVisitor {
+ public void visit(IOperatorDescriptor op) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
new file mode 100644
index 0000000..274ffdb
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanUtils {
+ public static void visit(JobSpecification spec, IOperatorDescriptorVisitor visitor) throws Exception {
+ Set<OperatorDescriptorId> seen = new HashSet<OperatorDescriptorId>();
+ for (IOperatorDescriptor op : spec.getOperatorMap().values()) {
+ visitOperator(visitor, seen, op);
+ }
+ }
+
+ private static void visitOperator(IOperatorDescriptorVisitor visitor, Set<OperatorDescriptorId> seen,
+ IOperatorDescriptor op) throws Exception {
+ if (!seen.contains(op)) {
+ visitor.visit(op);
+ }
+ seen.add(op.getOperatorId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
new file mode 100644
index 0000000..dfffcb3
--- /dev/null
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+
+public class WebServer {
+ private Server server;
+ private SelectChannelConnector connector;
+
+ public WebServer(Handler[] handlers) throws Exception {
+ server = new Server();
+
+ connector = new SelectChannelConnector();
+
+ server.setConnectors(new Connector[] { connector });
+
+ ContextHandlerCollection handler = new ContextHandlerCollection();
+ handler.setHandlers(handlers);
+ server.setHandler(handler);
+ }
+
+ public void setPort(int port) {
+ connector.setPort(port);
+ }
+
+ public void start() throws Exception {
+ server.start();
+ }
+
+ public void stop() throws Exception {
+ server.stop();
+ }
+}
\ No newline at end of file