Added application support. Added examples. Added CLI
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@57 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/.classpath b/hyracks-control-cc/.classpath
index 88cebb7..ebe2cde 100644
--- a/hyracks-control-cc/.classpath
+++ b/hyracks-control-cc/.classpath
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/main/java"/>
+ <classpathentry kind="src" path="src/main/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index 96992af..a975238 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -25,20 +25,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <version>8.0.0.M0</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>args4j</groupId>
- <artifactId>args4j</artifactId>
- <version>2.0.12</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>org.apache.wicket</groupId>
<artifactId>wicket</artifactId>
<version>1.4.7</version>
@@ -52,5 +38,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2597df7..1d8aca2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -14,7 +14,12 @@
*/
package edu.uci.ics.hyracks.control.cc;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.rmi.registry.LocateRegistry;
@@ -22,6 +27,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -43,29 +49,36 @@
import jol.core.Runtime;
import jol.core.Runtime.DebugLevel;
+import org.apache.commons.io.IOUtils;
+import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.control.IClusterController;
+import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NodeParameters;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.NodeParameters;
-import edu.uci.ics.hyracks.control.common.api.IClusterController;
-import edu.uci.ics.hyracks.control.common.api.INodeController;
-import edu.uci.ics.hyracks.control.common.comm.Endpoint;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
+ IHyracksClientInterface {
private static final long serialVersionUID = 1L;
private CCConfig ccConfig;
@@ -74,8 +87,14 @@
private final Map<String, NodeControllerState> nodeRegistry;
+ private final Map<String, ApplicationContext> applications;
+
+ private final ServerContext serverCtx;
+
private WebServer webServer;
+ private ClusterControllerInfo info;
+
private final IJobManager jobManager;
private final Executor taskExecutor;
@@ -87,11 +106,16 @@
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ applications = new Hashtable<String, ApplicationContext>();
+ serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
+ ClusterControllerService.class.getName()));
Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
jobManager = new JOLJobManagerImpl(this, jolRuntime);
taskExecutor = Executors.newCachedThreadPool();
- webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
+ webServer = new WebServer();
+ webServer.addHandler(getAdminConsoleHandler());
+ webServer.addHandler(getApplicationInstallationHandler());
this.timer = new Timer(true);
}
@@ -103,6 +127,8 @@
registry.rebind(IClusterController.class.getName(), this);
webServer.setPort(ccConfig.httpPort);
webServer.start();
+ info = new ClusterControllerInfo();
+ info.setWebPort(webServer.getListeningPort());
timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
@@ -115,13 +141,12 @@
}
@Override
- public UUID createJob(JobSpecification jobSpec) throws Exception {
- return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
- }
-
- @Override
- public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- return jobManager.createJob(jobSpec, jobFlags);
+ public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ ApplicationContext appCtx = applications.get(appName);
+ if (appCtx == null) {
+ throw new HyracksException("No application with id " + appName + " found");
+ }
+ return jobManager.createJob(appName, (JobSpecification) appCtx.deserialize(jobSpec), jobFlags);
}
@Override
@@ -138,6 +163,7 @@
jobManager.registerNode(id);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
+ params.setClusterControllerInfo(info);
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
return params;
}
@@ -237,12 +263,54 @@
return handler;
}
- private Handler getApplicationDataHandler() {
+ private Handler getApplicationInstallationHandler() {
ContextHandler handler = new ContextHandler("/applications");
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
+ try {
+ while (target.startsWith("/")) {
+ target = target.substring(1);
+ }
+ while (target.endsWith("/")) {
+ target = target.substring(0, target.length() - 1);
+ }
+ String[] parts = target.split("/");
+ if (parts.length != 1) {
+ return;
+ }
+ String appName = parts[0];
+ ApplicationContext appCtx;
+ appCtx = applications.get(appName);
+ if (appCtx != null) {
+ if (HttpMethods.PUT.equals(request.getMethod())) {
+ OutputStream os = appCtx.getHarOutputStream();
+ try {
+ IOUtils.copyLarge(request.getInputStream(), os);
+ } finally {
+ os.close();
+ }
+ } else if (HttpMethods.GET.equals(request.getMethod())) {
+ if (!appCtx.containsHar()) {
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ } else {
+ InputStream is = appCtx.getHarInputStream();
+ response.setContentType("application/octet-stream");
+ response.setStatus(HttpServletResponse.SC_OK);
+ try {
+ IOUtils.copyLarge(is, response.getOutputStream());
+ } finally {
+ is.close();
+ }
+ }
+ }
+ baseRequest.setHandled(true);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
}
});
return handler;
@@ -274,6 +342,47 @@
jobManager.notifyNodeFailure(nodeId);
}
+ @Override
+ public void createApplication(String appName) throws Exception {
+ synchronized (applications) {
+ if (applications.containsKey(appName)) {
+ throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+ }
+ ApplicationContext appCtx = new ApplicationContext(serverCtx, appName);
+ applications.put(appName, appCtx);
+ }
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ ApplicationContext appCtx = applications.remove(appName);
+ if (appCtx != null) {
+ synchronized (this) {
+ for (NodeControllerState ncs : nodeRegistry.values()) {
+ ncs.getNodeController().destroyApplication(appName);
+ }
+ }
+ appCtx.deinitialize();
+ }
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+ ApplicationContext appCtx = applications.get(appName);
+ appCtx.initialize();
+ boolean deployHar = appCtx.containsHar();
+ synchronized (this) {
+ for (NodeControllerState ncs : nodeRegistry.values()) {
+ ncs.getNodeController().createApplication(appName, deployHar);
+ }
+ }
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ return info;
+ }
+
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -346,19 +455,28 @@
return accumulator == null ? null : accumulator.getResult();
}
+ private static byte[] serialize(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(o);
+ return baos.toByteArray();
+ }
+
static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
private String nodeId;
private UUID jobId;
+ private String appName;
private JobPlan plan;
private UUID stageId;
private int attempt;
private Map<ActivityNodeId, Set<Integer>> tasks;
private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
- public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ public Phase1Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId, int attempt,
Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
this.nodeId = nodeId;
this.jobId = jobId;
+ this.appName = appName;
this.plan = plan;
this.stageId = stageId;
this.attempt = attempt;
@@ -368,7 +486,7 @@
@Override
public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
- return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+ return node.initializeJobletPhase1(appName, jobId, serialize(plan), stageId, attempt, tasks, opPartitions);
}
@Override
@@ -385,17 +503,19 @@
static class Phase2Installer implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
+ private String appName;
private JobPlan plan;
private UUID stageId;
private Map<ActivityNodeId, Set<Integer>> tasks;
private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
private Map<PortInstanceId, Endpoint> globalPortMap;
- public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+ public Phase2Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId,
Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
Map<PortInstanceId, Endpoint> globalPortMap) {
this.nodeId = nodeId;
this.jobId = jobId;
+ this.appName = appName;
this.plan = plan;
this.stageId = stageId;
this.tasks = tasks;
@@ -405,7 +525,7 @@
@Override
public Void execute(INodeController node) throws Exception {
- node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+ node.initializeJobletPhase2(appName, jobId, serialize(plan), stageId, tasks, opPartitions, globalPortMap);
return null;
}
@@ -544,19 +664,4 @@
return portMap;
}
}
-
- @Override
- public void createApplication(String appName) throws Exception {
-
- }
-
- @Override
- public void destroyApplication(String appName) throws Exception {
-
- }
-
- @Override
- public void startApplication(String appName) throws Exception {
-
- }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index 9d500c0..bb90d0b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -24,12 +24,12 @@
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public interface IJobManager {
- public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
public void start(UUID jobId) throws Exception;
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception;
+ StageletStatistics statistics) throws Exception;
public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index d7fd61d..22cce67 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -36,6 +36,7 @@
import jol.types.table.Function;
import jol.types.table.Key;
import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
@@ -52,13 +53,12 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.control.common.comm.Endpoint;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
public class JOLJobManagerImpl implements IJobManager {
private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -194,8 +194,9 @@
UUID jobId = (UUID) data[0];
UUID stageId = (UUID) data[1];
Integer attempt = (Integer) data[2];
- JobPlan plan = (JobPlan) data[3];
- Set<List> ts = (Set<List>) data[4];
+ String appName = (String) data[3];
+ JobPlan plan = (JobPlan) data[4];
+ Set<List> ts = (Set<List>) data[5];
Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
@@ -229,7 +230,7 @@
aParts.add((Integer) lData[1]);
}
p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
- plan, stageId, attempt, tasks, opPartitions);
+ appName, plan, stageId, attempt, tasks, opPartitions);
}
LOGGER.info("Stage start - Phase 1");
Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
@@ -257,7 +258,7 @@
aParts.add((Integer) lData[1]);
}
p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
- plan, stageId, tasks, opPartitions, globalPortMap);
+ appName, plan, stageId, tasks, opPartitions, globalPortMap);
p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
stageId);
ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
@@ -368,7 +369,7 @@
}
@Override
- public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
final UUID jobId = UUID.randomUUID();
final JobPlanBuilder builder = new JobPlanBuilder();
@@ -420,7 +421,8 @@
cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
}
- BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+ BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, appName, jobSpec,
+ builder.getPlan()));
jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
@@ -481,7 +483,7 @@
if (jobTuple == null) {
return null;
}
- return (JobStatus) jobTuple.value(1);
+ return (JobStatus) jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX);
} catch (BadKeyException e) {
throw new RuntimeException(e);
}
@@ -583,7 +585,8 @@
public JobStatistics waitForCompletion(UUID jobId) throws Exception {
synchronized (jobTable) {
Tuple jobTuple = null;
- while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+ while ((jobTuple = jobTable.lookupJob(jobId)) != null
+ && jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX) != JobStatus.TERMINATED) {
jobTable.wait();
}
return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
@@ -599,21 +602,29 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
- JobPlan.class, Set.class };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, JobStatus.class,
+ JobSpecification.class, JobPlan.class, Set.class };
+
+ public static final int JOBID_FIELD_INDEX = 0;
+ public static final int APPNAME_FIELD_INDEX = 1;
+ public static final int JOBSTATUS_FIELD_INDEX = 2;
+ public static final int JOBSPEC_FIELD_INDEX = 3;
+ public static final int JOBPLAN_FIELD_INDEX = 4;
+ public static final int STATISTICS_FIELD_INDEX = 5;
public JobTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
@SuppressWarnings("unchecked")
- static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
- return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+ static Tuple createInitialJobTuple(UUID jobId, String appName, JobSpecification jobSpec, JobPlan plan) {
+ return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
}
@SuppressWarnings("unchecked")
JobStatistics buildJobStatistics(Tuple jobTuple) {
- Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+ Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple
+ .value(JobTable.STATISTICS_FIELD_INDEX);
JobStatistics stats = new JobStatistics();
if (statsSet != null) {
for (Set<StageletStatistics> stageStatsSet : statsSet) {
@@ -822,8 +833,8 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
- Set.class };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, String.class,
+ JobPlan.class, Set.class };
public StartMessageTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1010,6 +1021,11 @@
private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
Integer.class };
+ public static final int JOBID_FIELD_INDEX = 0;
+ public static final int ODID_FIELD_INDEX = 1;
+ public static final int NPARTITIONS_FIELD_INDEX = 2;
+ public static final int ORDINAL_FIELD_INDEX = 3;
+
public ExpandPartitionCountConstraintTableFunction() {
super(TABLE_NAME, SCHEMA);
}
@@ -1019,9 +1035,9 @@
TupleSet result = new BasicTupleSet();
int counter = 0;
for (Tuple t : tuples) {
- int nPartitions = (Integer) t.value(2);
+ int nPartitions = (Integer) t.value(NPARTITIONS_FIELD_INDEX);
for (int i = 0; i < nPartitions; ++i) {
- result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+ result.add(new Tuple(t.value(JOBID_FIELD_INDEX), t.value(ODID_FIELD_INDEX), i, counter++));
}
}
return result;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
index fcb796e..d5056d3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanBuilder.java
@@ -12,8 +12,8 @@
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
public class JobPlanBuilder implements IActivityGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
index 956c658..af3b77f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JobPlanner.java
@@ -29,12 +29,12 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStage;
import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.common.job.JobPlan;
-import edu.uci.ics.hyracks.control.common.job.JobStage;
public class JobPlanner {
private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e4d611b..a638c09 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -1,6 +1,6 @@
package edu.uci.ics.hyracks.control.cc;
-import edu.uci.ics.hyracks.control.common.api.INodeController;
+import edu.uci.ics.hyracks.api.control.INodeController;
public class NodeControllerState {
private final INodeController nodeController;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index dfffcb3..12dfe21 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -18,28 +18,33 @@
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
public class WebServer {
- private Server server;
- private SelectChannelConnector connector;
+ private final Server server;
+ private final SelectChannelConnector connector;
+ private final HandlerCollection handlerCollection;
- public WebServer(Handler[] handlers) throws Exception {
+ public WebServer() throws Exception {
server = new Server();
connector = new SelectChannelConnector();
server.setConnectors(new Connector[] { connector });
- ContextHandlerCollection handler = new ContextHandlerCollection();
- handler.setHandlers(handlers);
- server.setHandler(handler);
+ handlerCollection = new ContextHandlerCollection();
+ server.setHandler(handlerCollection);
}
public void setPort(int port) {
connector.setPort(port);
}
+ public int getListeningPort() {
+ return connector.getLocalPort();
+ }
+
public void start() throws Exception {
server.start();
}
@@ -47,4 +52,8 @@
public void stop() throws Exception {
server.stop();
}
+
+ public void addHandler(Handler handler) {
+ handlerCollection.addHandler(handler);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
new file mode 100644
index 0000000..0be0401
--- /dev/null
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -0,0 +1,314 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+
+watch(job, a);
+watch(job, i);
+watch(job, d);
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+ activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+ StageNumber2 <= StageNumber1
+ {
+ StageNumber := StageNumber1 + 1;
+ };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+ activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+ activitystage(JobId, _, _, StageNumber)
+ {
+ StageId := java.util.UUID.randomUUID();
+ };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+ job(JobId, _, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+ jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+ jobattempt(JobId, Attempt),
+ stagestart(JobId, _, Attempt),
+ abortcomplete(JobId, _, Attempt)
+ {
+ NextAttempt := Attempt + 1;
+ };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+ jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+ job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+ jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+ stagestart(JobId, StageNumber, Attempt),
+ stagefinish#insert(StageId, StageNumber, Attempt, _)
+ {
+ NextStageNumber := StageNumber + 1;
+ };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+ operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+ availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+ availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+ operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+ maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+ rankedavailablenodes(NodeId, NodeRank),
+ availablenodecount(_, NodeCount),
+ NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+ operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+ expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ operatordescriptor(JobId, OperatorId, _, _),
+ activitystage(JobId, OperatorId, ActivityId, StageNumber),
+ jobstage(JobId, StageNumber, StageId),
+ attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 4, 5), {UUID, UUID, String, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, AppName, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+ activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+ job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+ {
+ ActivityInfo := [ActivityId, Partition];
+ };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, String, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, set<Tuple>) :-
+ stageletstart#insert(JobId, StageId, AppName, JobPlan, NodeId, Attempt, ActivityInfoSet),
+ availablenodes(NodeId),
+ ActivityInfoSet.size() != 0
+ {
+ Tuple := [NodeId, ActivityInfoSet];
+ };
+
+startmessage(JobId, StageId, Attempt, AppName, JobPlan, TSet) :-
+ startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+ stageletfailure(JobId, StageId, NodeId, Attempt),
+ stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, _),
+ stageletstart(JobId, StageId, _, _, NodeIdOther, Attempt, ActivityIdSet),
+ failednodes#insert(NodeId),
+ notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ availablenodes(NodeId)
+ {
+ Tuple := [NodeId, ActivityIdSet];
+ };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+ TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+ notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+ stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+ stageabort(JobId, StageId, Attempt, NodeIdSet2),
+ NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+ stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+ startmessage_agg(JobId, StageId, Attempt, _, _, TSet),
+ stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+ jobstage(JobId, StageNumber, StageId),
+ TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+ job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, SSet),
+ notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, _),
+ attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+ notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+ jobcleanup_agg(JobId, NodeIdSet);