Renamed hyracks-runtime to hyracks-control-nc
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@53 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/.classpath b/hyracks-control-nc/.classpath
new file mode 100644
index 0000000..2fb3f21
--- /dev/null
+++ b/hyracks-control-nc/.classpath
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src/main/java"/>
+ <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="src" path="src/main/resources"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-control-nc/.project b/hyracks-control-nc/.project
new file mode 100644
index 0000000..83d4a09
--- /dev/null
+++ b/hyracks-control-nc/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>hyracks-control-nc</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
new file mode 100644
index 0000000..0a59c1f
--- /dev/null
+++ b/hyracks-control-nc/pom.xml
@@ -0,0 +1,93 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.1.0</version>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.12</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.dcache</groupId>
+ <artifactId>dcache-client</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wicket</groupId>
+ <artifactId>wicket</artifactId>
+ <version>1.4.7</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>jol</groupId>
+ <artifactId>jol</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
new file mode 100644
index 0000000..eb7ac72
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class Joblet {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeControllerService nodeController;
+
+ private final UUID jobId;
+
+ private final Map<UUID, Stagelet> stageletMap;
+
+ private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
+
+ public Joblet(NodeControllerService nodeController, UUID jobId) throws Exception {
+ this.nodeController = nodeController;
+ this.jobId = jobId;
+ stageletMap = new HashMap<UUID, Stagelet>();
+ envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ }
+
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
+ if (!envMap.containsKey(hod.getOperatorId())) {
+ envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ }
+ Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
+ if (!opEnvMap.containsKey(partition)) {
+ opEnvMap.put(partition, new OperatorEnvironmentImpl());
+ }
+ return opEnvMap.get(partition);
+ }
+
+ private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+ private final Map<String, Object> map;
+
+ public OperatorEnvironmentImpl() {
+ map = new HashMap<String, Object>();
+ }
+
+ @Override
+ public Object get(String name) {
+ return map.get(name);
+ }
+
+ @Override
+ public void set(String name, Object value) {
+ map.put(name, value);
+ }
+ }
+
+ public void setStagelet(UUID stageId, Stagelet stagelet) {
+ stageletMap.put(stageId, stagelet);
+ }
+
+ public Stagelet getStagelet(UUID stageId) throws Exception {
+ return stageletMap.get(stageId);
+ }
+
+ public Executor getExecutor() {
+ return nodeController.getExecutor();
+ }
+
+ public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+ stageletMap.remove(stageId);
+ nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
+ }
+
+ public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+ stageletMap.remove(stageId);
+ nodeController.notifyStageFailed(jobId, stageId, attempt);
+ }
+
+ public NodeControllerService getNodeController() {
+ return nodeController;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
new file mode 100644
index 0000000..af1f3dc
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+import edu.uci.ics.hyracks.control.common.api.NCConfig;
+
+public class NCDriver {
+ public static void main(String args[]) throws Exception {
+ NCConfig ncConfig = new NCConfig();
+ CmdLineParser cp = new CmdLineParser(ncConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+
+ DCacheClientConfig dccConfig = new DCacheClientConfig();
+ dccConfig.servers = ncConfig.dcacheClientServers;
+ dccConfig.serverLocal = ncConfig.dcacheClientServerLocal;
+ dccConfig.path = ncConfig.dcacheClientPath;
+
+ DCacheClient.get().init(dccConfig);
+
+ final NodeControllerService nService = new NodeControllerService(ncConfig);
+ nService.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ nService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
new file mode 100644
index 0000000..14c5339
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
+import edu.uci.ics.hyracks.control.common.NodeCapability;
+import edu.uci.ics.hyracks.control.common.NodeParameters;
+import edu.uci.ics.hyracks.control.common.api.IClusterController;
+import edu.uci.ics.hyracks.control.common.api.INodeController;
+import edu.uci.ics.hyracks.control.common.api.NCConfig;
+import edu.uci.ics.hyracks.control.common.comm.Endpoint;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+import edu.uci.ics.hyracks.control.nc.comm.ConnectionManager;
+import edu.uci.ics.hyracks.control.nc.comm.DemuxDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
+
+public class NodeControllerService extends AbstractRemoteService implements INodeController {
+ private static final long serialVersionUID = 1L;
+
+ private NCConfig ncConfig;
+
+ private final String id;
+
+ private final IHyracksContext ctx;
+
+ private final NodeCapability nodeCapability;
+
+ private final ConnectionManager connectionManager;
+
+ private final Timer timer;
+
+ private IClusterController ccs;
+
+ private Map<UUID, Joblet> jobletMap;
+
+ private Executor executor;
+
+ private NodeParameters nodeParameters;
+
+ public NodeControllerService(NCConfig ncConfig) throws Exception {
+ this.ncConfig = ncConfig;
+ id = ncConfig.nodeId;
+ this.ctx = new HyracksContext(ncConfig.frameSize);
+ if (id == null) {
+ throw new Exception("id not set");
+ }
+ nodeCapability = computeNodeCapability();
+ connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
+ jobletMap = new HashMap<UUID, Joblet>();
+ executor = Executors.newCachedThreadPool();
+ timer = new Timer(true);
+ }
+
+ private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ connectionManager.start();
+ Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
+ IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+ this.nodeParameters = cc.registerNode(this);
+
+ // Schedule heartbeat generator.
+ timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ connectionManager.stop();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public NodeCapability getNodeCapability() throws Exception {
+ return nodeCapability;
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ private static NodeCapability computeNodeCapability() {
+ NodeCapability nc = new NodeCapability();
+ nc.setCPUCount(Runtime.getRuntime().availableProcessors());
+ return nc;
+ }
+
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
+ ipaddrStr = ipaddrStr.trim();
+ Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+ Matcher m = pattern.matcher(ipaddrStr);
+ if (!m.matches()) {
+ throw new Exception(MessageFormat.format(
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ }
+ byte[] ipBytes = new byte[4];
+ ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+ ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+ ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+ ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+ return InetAddress.getByAddress(ipBytes);
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, final JobPlan plan, UUID stageId,
+ int attempt, Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+ throws Exception {
+ try {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+
+ IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+ return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+ }
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+ return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ }
+ };
+
+ final Joblet joblet = getLocalJoblet(jobId);
+
+ Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+ joblet.setStagelet(stageId, stagelet);
+
+ final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+ Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+ List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Initializing " + hanId + " -> " + han);
+ }
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
+ opPartitions.get(op.getOperatorId()).size());
+ OperatorRunnable or = new OperatorRunnable(ctx, hon);
+ stagelet.setOperator(op.getOperatorId(), i, or);
+ if (inputs != null) {
+ for (int j = 0; j < inputs.size(); ++j) {
+ if (j >= 1) {
+ throw new IllegalStateException();
+ }
+ IConnectorDescriptor conn = inputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+ endpointList.add(endpoint);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+ stageId);
+ connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+ PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+ .getTaskInputMap().get(hanId).get(j), i);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+ }
+ portMap.put(piId, endpoint);
+ IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+ opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+ or.setFrameReader(reader);
+ }
+ }
+ honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+ }
+ }
+
+ stagelet.setEndpointList(endpointList);
+
+ return portMap;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
+ final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+ throws HyracksDataException {
+ final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+ .getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount,
+ nConsumerCount);
+
+ return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
+ private int frameCount;
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameCount = 0;
+ reader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean status = reader.nextFrame(buffer);
+ if (status) {
+ ++frameCount;
+ }
+ return status;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ stagelet.getStatistics()
+ .getStatisticsMap()
+ .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+ String.valueOf(frameCount));
+ }
+ } : reader;
+ }
+
+ @Override
+ public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+ try {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+ final Joblet ji = getLocalJoblet(jobId);
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
+ final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+
+ final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+
+ final JobSpecification spec = plan.getJobSpecification();
+
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+ if (outputs != null) {
+ for (int j = 0; j < outputs.size(); ++j) {
+ final IConnectorDescriptor conn = outputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ final int senderIndex = i;
+ IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+ PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Endpoint ep = globalPortMap.get(piId);
+ if (ep == null) {
+ LOGGER.info("Got null Endpoint for " + piId);
+ throw new NullPointerException();
+ }
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+ }
+ return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+ ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ }
+ };
+ or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+ .getConnectorRecordDescriptor(conn), edwFactory, i,
+ opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()), spec
+ .getConnectorRecordDescriptor(conn));
+ }
+ }
+ stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
+ final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+ return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
+ private int frameCount;
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameCount = 0;
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ++frameCount;
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ stagelet.getStatistics()
+ .getStatisticsMap()
+ .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
+ + receiverIndex, String.valueOf(frameCount));
+ }
+ } : writer;
+ }
+
+ @Override
+ public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
+ final Joblet ji = getLocalJoblet(jobId);
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
+ for (Endpoint e : si.getEndpointList()) {
+ connectionManager.unacceptConnection(e.getEndpointId());
+ }
+ si.setEndpointList(null);
+ }
+
+ private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null) {
+ ji = new Joblet(this, jobId);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ @Override
+ public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cleaning up after job: " + jobId);
+ }
+ jobletMap.remove(jobId);
+ connectionManager.dumpStats();
+ }
+
+ @Override
+ public void startStage(UUID jobId, UUID stageId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Stagelet s = ji.getStagelet(stageId);
+ if (s != null) {
+ s.start();
+ }
+ }
+ }
+
+ public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+ ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
+ }
+
+ public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+ ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+ }
+
+ @Override
+ public void notifyRegistration(IClusterController ccs) throws Exception {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public NCConfig getConfiguration() throws Exception {
+ return ncConfig;
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ cc.nodeHeartbeat(id);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Stagelet stagelet = ji.getStagelet(stageId);
+ if (stagelet != null) {
+ stagelet.abort();
+ connectionManager.abortConnections(jobId, stageId);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
new file mode 100644
index 0000000..696ce29
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.rmi.RemoteException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.common.comm.Endpoint;
+import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
+
+public class Stagelet {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(Stagelet.class.getName());
+
+ private final Joblet joblet;
+
+ private final UUID stageId;
+
+ private final int attempt;
+
+ private final Map<OperatorInstanceId, OperatorRunnable> honMap;
+
+ private List<Endpoint> endpointList;
+
+ private boolean started;
+
+ private volatile boolean abort;
+
+ private final Set<OperatorInstanceId> pendingOperators;
+
+ private final StageletStatistics stats;
+
+ public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
+ this.joblet = joblet;
+ this.stageId = stageId;
+ this.attempt = attempt;
+ pendingOperators = new HashSet<OperatorInstanceId>();
+ started = false;
+ honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+ stats = new StageletStatistics();
+ stats.setNodeId(nodeId);
+ }
+
+ public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
+ honMap.put(new OperatorInstanceId(odId, partition), hon);
+ }
+
+ public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
+ return honMap;
+ }
+
+ public void setEndpointList(List<Endpoint> endpointList) {
+ this.endpointList = endpointList;
+ }
+
+ public List<Endpoint> getEndpointList() {
+ return endpointList;
+ }
+
+ public synchronized void start() throws Exception {
+ if (started) {
+ throw new Exception("Joblet already started");
+ }
+ started = true;
+ stats.setStartTime(new Date());
+ notifyAll();
+ }
+
+ public synchronized void abort() {
+ this.abort = true;
+ for (OperatorRunnable r : honMap.values()) {
+ r.abort();
+ }
+ }
+
+ public void installRunnable(final OperatorInstanceId opIId) {
+ pendingOperators.add(opIId);
+ final OperatorRunnable hon = honMap.get(opIId);
+ joblet.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ waitUntilStarted();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ if (abort) {
+ return;
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": STARTING");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
+ try {
+ hon.run();
+ notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": TERMINATED");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
+ }
+ });
+ }
+
+ protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
+ pendingOperators.remove(opIId);
+ if (pendingOperators.isEmpty()) {
+ stats.setEndTime(new Date());
+ try {
+ joblet.notifyStageletComplete(stageId, attempt, stats);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+ abort();
+ try {
+ joblet.notifyStageletFailed(stageId, attempt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private synchronized void waitUntilStarted() throws InterruptedException {
+ while (!started && !abort) {
+ wait();
+ }
+ }
+
+ public StageletStatistics getStatistics() {
+ return stats;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
new file mode 100644
index 0000000..20d41d6
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class ConnectionEntry implements IConnectionEntry {
+ private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
+
+ private SocketChannel socketChannel;
+
+ private final ByteBuffer readBuffer;
+
+ private final ByteBuffer writeBuffer;
+
+ private IDataReceiveListener recvListener;
+
+ private Object attachment;
+
+ private final SelectionKey key;
+
+ private UUID jobId;
+
+ private UUID stageId;
+
+ private boolean aborted;
+
+ public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+ this.socketChannel = socketChannel;
+ readBuffer = ctx.getResourceManager().allocateFrame();
+ readBuffer.clear();
+ writeBuffer = ctx.getResourceManager().allocateFrame();
+ writeBuffer.clear();
+ this.key = key;
+ }
+
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ public boolean dispatch(SelectionKey key) throws IOException {
+ if (aborted) {
+ recvListener.dataReceived(this);
+ } else {
+ if (key.isReadable()) {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
+ }
+ int bytesRead = socketChannel.read(readBuffer);
+ if (bytesRead < 0) {
+ recvListener.eos(this);
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
+ }
+ recvListener.dataReceived(this);
+ } else if (key.isWritable()) {
+ synchronized (this) {
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ int bytesWritten = socketChannel.write(writeBuffer);
+ if (bytesWritten < 0) {
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ if (writeBuffer.remaining() <= 0) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ }
+ writeBuffer.compact();
+ notifyAll();
+ }
+ } else {
+ LOGGER.warning("Spurious event triggered: " + key.readyOps());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ByteBuffer getReadBuffer() {
+ return readBuffer;
+ }
+
+ @Override
+ public synchronized void write(ByteBuffer buffer) {
+ while (buffer.remaining() > 0) {
+ while (writeBuffer.remaining() <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ int oldLimit = buffer.limit();
+ buffer.limit(Math.min(oldLimit, writeBuffer.remaining()));
+ writeBuffer.put(buffer);
+ buffer.limit(oldLimit);
+ int ops = key.interestOps();
+ key.interestOps(ops | SelectionKey.OP_WRITE);
+ key.selector().wakeup();
+ }
+ }
+
+ @Override
+ public void setDataReceiveListener(IDataReceiveListener listener) {
+ this.recvListener = listener;
+ }
+
+ @Override
+ public void attach(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public void close() {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public SelectionKey getSelectionKey() {
+ return key;
+ }
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void setJobId(UUID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
+
+ @Override
+ public void setStageId(UUID stageId) {
+ this.stageId = stageId;
+ }
+
+ @Override
+ public void abort() {
+ aborted = true;
+ }
+
+ @Override
+ public boolean aborted() {
+ return aborted;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
new file mode 100644
index 0000000..452f25e
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
@@ -0,0 +1,395 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.comm;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.control.common.comm.NetworkAddress;
+
+public class ConnectionManager {
+ private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
+
+ private static final int INITIAL_MESSAGE_LEN = 20;
+
+ private NetworkAddress networkAddress;
+
+ private ServerSocketChannel serverSocketChannel;
+
+ private final IHyracksContext ctx;
+
+ private final Map<UUID, IDataReceiveListenerFactory> pendingConnectionReceivers;
+
+ private final ConnectionListenerThread connectionListenerThread;
+
+ private final DataListenerThread dataListenerThread;
+
+ private final IDataReceiveListener initialDataReceiveListener;
+
+ private final Set<IConnectionEntry> connections;
+
+ private volatile boolean stopped;
+
+ private ByteBuffer emptyFrame;
+
+ public ConnectionManager(IHyracksContext ctx, InetAddress address) throws IOException {
+ this.ctx = ctx;
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(new InetSocketAddress(address, 0));
+
+ networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
+ + serverSocket.getLocalPort());
+ }
+
+ pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
+ dataListenerThread = new DataListenerThread();
+ connectionListenerThread = new ConnectionListenerThread();
+ initialDataReceiveListener = new InitialDataReceiveListener();
+ emptyFrame = ctx.getResourceManager().allocateFrame();
+ emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+ connections = new HashSet<IConnectionEntry>();
+ }
+
+ public synchronized void dumpStats() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Number of pendingConnectionReceivers: " + pendingConnectionReceivers.size());
+ LOGGER.info("Number of selectable keys: " + dataListenerThread.selector.keys().size());
+ }
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void start() {
+ stopped = false;
+ connectionListenerThread.start();
+ dataListenerThread.start();
+ }
+
+ public void stop() {
+ try {
+ stopped = true;
+ serverSocketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
+ try {
+ SocketChannel channel = SocketChannel
+ .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+ byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
+ ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
+ buffer.clear();
+ buffer.putLong(id.getMostSignificantBits());
+ buffer.putLong(id.getLeastSignificantBits());
+ buffer.putInt(senderId);
+ buffer.flip();
+ int bytesWritten = 0;
+ while (bytesWritten < INITIAL_MESSAGE_LEN) {
+ int n = channel.write(buffer);
+ if (n < 0) {
+ throw new HyracksDataException("Stream closed prematurely");
+ }
+ bytesWritten += n;
+ }
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Send Initial message: " + id + ":" + senderId);
+ }
+ buffer.clear();
+ buffer.limit(FrameConstants.SIZE_LEN);
+ int bytesRead = 0;
+ while (bytesRead < FrameConstants.SIZE_LEN) {
+ int n = channel.read(buffer);
+ if (n < 0) {
+ throw new HyracksDataException("Stream closed prematurely");
+ }
+ bytesRead += n;
+ }
+ buffer.flip();
+ int frameLen = buffer.getInt();
+ if (frameLen != FrameConstants.SIZE_LEN) {
+ throw new IllegalStateException("Received illegal framelen = " + frameLen);
+ }
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Got Ack message: " + id + ":" + senderId);
+ }
+ return new NetworkFrameWriter(channel);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public synchronized void acceptConnection(UUID id, IDataReceiveListenerFactory receiver) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.info("Connection manager accepting " + id);
+ }
+ pendingConnectionReceivers.put(id, receiver);
+ }
+
+ public synchronized void unacceptConnection(UUID id) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.info("Connection manager unaccepting " + id);
+ }
+ pendingConnectionReceivers.remove(id);
+ }
+
+ public synchronized void abortConnections(UUID jobId, UUID stageId) {
+ List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+ synchronized (this) {
+ for (IConnectionEntry ce : connections) {
+ if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+ abortConnections.add(ce);
+ }
+ }
+ }
+ dataListenerThread.addPendingAbortConnections(abortConnections);
+ }
+
+ private final class NetworkFrameWriter implements IFrameWriter {
+ private SocketChannel channel;
+
+ NetworkFrameWriter(SocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ synchronized (emptyFrame) {
+ emptyFrame.position(0);
+ emptyFrame.limit(emptyFrame.capacity());
+ channel.write(emptyFrame);
+ }
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ int frameLen = buffer.getInt(buffer.position());
+ LOGGER.finer("ConnectionManager.NetworkFrameWriter: frameLen = " + frameLen);
+ }
+ while (buffer.remaining() > 0) {
+ channel.write(buffer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+ }
+
+ private final class ConnectionListenerThread extends Thread {
+ public ConnectionListenerThread() {
+ super("Hyracks Connection Listener Thread");
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ SocketChannel sc = serverSocketChannel.accept();
+ dataListenerThread.addSocketChannel(sc);
+ } catch (AsynchronousCloseException e) {
+ // do nothing
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private final class DataListenerThread extends Thread {
+ private Selector selector;
+
+ private List<SocketChannel> pendingNewSockets;
+ private List<IConnectionEntry> pendingAbortConnections;
+
+ public DataListenerThread() {
+ super("Hyracks Data Listener Thread");
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ pendingNewSockets = new ArrayList<SocketChannel>();
+ pendingAbortConnections = new ArrayList<IConnectionEntry>();
+ }
+
+ synchronized void addSocketChannel(SocketChannel sc) throws IOException {
+ pendingNewSockets.add(sc);
+ selector.wakeup();
+ }
+
+ synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+ pendingAbortConnections.addAll(abortConnections);
+ selector.wakeup();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting Select");
+ }
+ int n = selector.select();
+ synchronized (this) {
+ if (!pendingNewSockets.isEmpty()) {
+ for (SocketChannel sc : pendingNewSockets) {
+ sc.configureBlocking(false);
+ SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
+ ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
+ entry.setDataReceiveListener(initialDataReceiveListener);
+ scKey.attach(entry);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Woke up selector");
+ }
+ }
+ pendingNewSockets.clear();
+ }
+ if (!pendingAbortConnections.isEmpty()) {
+ for (IConnectionEntry ce : pendingAbortConnections) {
+ SelectionKey key = ce.getSelectionKey();
+ ce.abort();
+ ((ConnectionEntry) ce).dispatch(key);
+ key.cancel();
+ ce.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(ce);
+ }
+ }
+ pendingAbortConnections.clear();
+ }
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Selector: " + n);
+ }
+ if (n > 0) {
+ for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+ SelectionKey key = i.next();
+ i.remove();
+ ConnectionEntry entry = (ConnectionEntry) key.attachment();
+ boolean close = false;
+ try {
+ close = entry.dispatch(key);
+ } catch (IOException e) {
+ e.printStackTrace();
+ close = true;
+ }
+ if (close) {
+ key.cancel();
+ entry.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(entry);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private class InitialDataReceiveListener implements IDataReceiveListener {
+ @Override
+ public void dataReceived(IConnectionEntry entry) throws IOException {
+ ByteBuffer buffer = entry.getReadBuffer();
+ buffer.flip();
+ IDataReceiveListener newListener = null;
+ if (buffer.remaining() >= INITIAL_MESSAGE_LEN) {
+ long msb = buffer.getLong();
+ long lsb = buffer.getLong();
+ UUID endpointID = new UUID(msb, lsb);
+ int senderId = buffer.getInt();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Initial Frame received: " + endpointID + ":" + senderId);
+ }
+ IDataReceiveListenerFactory connectionReceiver;
+ synchronized (ConnectionManager.this) {
+ connectionReceiver = pendingConnectionReceivers.get(endpointID);
+ if (connectionReceiver == null) {
+ entry.close();
+ return;
+ }
+ }
+
+ newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
+ entry.setDataReceiveListener(newListener);
+ entry.setJobId(connectionReceiver.getJobId());
+ entry.setStageId(connectionReceiver.getStageId());
+ synchronized (ConnectionManager.this) {
+ connections.add(entry);
+ }
+ byte[] ack = new byte[4];
+ ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
+ ackBuffer.clear();
+ ackBuffer.putInt(FrameConstants.SIZE_LEN);
+ ackBuffer.flip();
+ entry.write(ackBuffer);
+ }
+ buffer.compact();
+ if (newListener != null && buffer.remaining() > 0) {
+ newListener.dataReceived(entry);
+ }
+ }
+
+ @Override
+ public void eos(IConnectionEntry entry) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
new file mode 100644
index 0000000..6b4c041
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
+ IDataReceiveListener {
+ private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
+
+ private final IHyracksContext ctx;
+ private final BitSet readyBits;
+ private IConnectionEntry senders[];
+ private int openSenderCount;
+ private UUID jobId;
+ private UUID stageId;
+
+ public DemuxDataReceiveListenerFactory(IHyracksContext ctx, UUID jobId, UUID stageId) {
+ this.ctx = ctx;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ readyBits = new BitSet();
+ senders = null;
+ openSenderCount = 0;
+ }
+
+ @Override
+ public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex) {
+ entry.attach(senderIndex);
+ addSender(senderIndex, entry);
+ return this;
+ }
+
+ @Override
+ public synchronized void dataReceived(IConnectionEntry entry) throws IOException {
+ int senderIndex = (Integer) entry.getAttachment();
+ ByteBuffer buffer = entry.getReadBuffer();
+ buffer.flip();
+ int dataLen = buffer.remaining();
+ if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
+ }
+ SelectionKey key = entry.getSelectionKey();
+ if (key.isValid()) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_READ);
+ }
+ readyBits.set(senderIndex);
+ notifyAll();
+ return;
+ }
+ buffer.compact();
+ }
+
+ @Override
+ public void eos(IConnectionEntry entry) {
+ }
+
+ private synchronized void addSender(int senderIndex, IConnectionEntry entry) {
+ readyBits.clear(senderIndex);
+ if (senders == null) {
+ senders = new IConnectionEntry[senderIndex + 1];
+ } else if (senders.length <= senderIndex) {
+ senders = Arrays.copyOf(senders, senderIndex + 1);
+ }
+ senders[senderIndex] = entry;
+ ++openSenderCount;
+ }
+
+ @Override
+ public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) {
+ while (openSenderCount > 0 && readyBits.isEmpty()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ lastReadSender = readyBits.nextSetBit(lastReadSender);
+ if (lastReadSender < 0) {
+ lastReadSender = readyBits.nextSetBit(0);
+ }
+ return senders[lastReadSender];
+ }
+
+ @Override
+ public synchronized void unreadyEntry(int index) {
+ readyBits.clear(index);
+ IConnectionEntry entry = senders[index];
+ SelectionKey key = entry.getSelectionKey();
+ if (key.isValid()) {
+ int ops = key.interestOps();
+ key.interestOps(ops | SelectionKey.OP_READ);
+ key.selector().wakeup();
+ }
+ }
+
+ @Override
+ public synchronized int closeEntry(int index) throws HyracksDataException {
+ IConnectionEntry entry = senders[index];
+ SelectionKey key = entry.getSelectionKey();
+ key.cancel();
+ try {
+ entry.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return --openSenderCount;
+ }
+
+ @Override
+ public synchronized int getSenderCount() {
+ return senders.length;
+ }
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
new file mode 100644
index 0000000..f61ff8b
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+
+public class HyracksContext implements IHyracksContext {
+ private final IResourceManager resourceManager;
+ private final int frameSize;
+
+ public HyracksContext(int frameSize) {
+ resourceManager = new ResourceManager(this);
+ this.frameSize = frameSize;
+ }
+
+ @Override
+ public IResourceManager getResourceManager() {
+ return resourceManager;
+ }
+
+ @Override
+ public int getFrameSize() {
+ return frameSize;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
new file mode 100644
index 0000000..39d98b1
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class OperatorRunnable implements Runnable {
+ private IOperatorNodePushable opNode;
+ private IFrameReader reader;
+ private ByteBuffer buffer;
+ private volatile boolean abort;
+
+ public OperatorRunnable(IHyracksContext ctx, IOperatorNodePushable opNode) {
+ this.opNode = opNode;
+ buffer = ctx.getResourceManager().allocateFrame();
+ }
+
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ opNode.setFrameWriter(index, writer, recordDesc);
+ }
+
+ public void setFrameReader(IFrameReader reader) {
+ this.reader = reader;
+ }
+
+ public void abort() {
+ abort = true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ opNode.open();
+ if (reader != null) {
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
+ buffer.flip();
+ opNode.nextFrame(buffer);
+ buffer.compact();
+ }
+ reader.close();
+ }
+ opNode.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/ResourceManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/ResourceManager.java
new file mode 100644
index 0000000..72202e3
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/ResourceManager.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+
+public final class ResourceManager implements IResourceManager {
+ private final IHyracksContext ctx;
+
+ public ResourceManager(IHyracksContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(ctx.getFrameSize());
+ }
+
+ @Override
+ public File createFile(String prefix, String suffix) throws IOException {
+ return File.createTempFile(prefix, suffix);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-nc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
new file mode 100644
index 0000000..b230e0e
--- /dev/null
+++ b/hyracks-control-nc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -0,0 +1,310 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.control.common.job.JobPlan;
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+ activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+ StageNumber2 <= StageNumber1
+ {
+ StageNumber := StageNumber1 + 1;
+ };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+ activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+ activitystage(JobId, _, _, StageNumber)
+ {
+ StageId := java.util.UUID.randomUUID();
+ };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+ jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+ jobattempt(JobId, Attempt),
+ stagestart(JobId, _, Attempt),
+ abortcomplete(JobId, _, Attempt)
+ {
+ NextAttempt := Attempt + 1;
+ };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+ jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+ jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+ stagestart(JobId, StageNumber, Attempt),
+ stagefinish#insert(StageId, StageNumber, Attempt, _)
+ {
+ NextStageNumber := StageNumber + 1;
+ };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+ operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+ availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+ availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+ operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+ maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+ rankedavailablenodes(NodeId, NodeRank),
+ availablenodecount(_, NodeCount),
+ NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+ operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+ expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ operatordescriptor(JobId, OperatorId, _, _),
+ activitystage(JobId, OperatorId, ActivityId, StageNumber),
+ jobstage(JobId, StageNumber, StageId),
+ attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+ activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+ {
+ ActivityInfo := [ActivityId, Partition];
+ };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+ availablenodes(NodeId),
+ ActivityInfoSet.size() != 0
+ {
+ Tuple := [NodeId, ActivityInfoSet];
+ };
+
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+ stageletfailure(JobId, StageId, NodeId, Attempt),
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+ stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+ failednodes#insert(NodeId),
+ notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ availablenodes(NodeId)
+ {
+ Tuple := [NodeId, ActivityIdSet];
+ };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+ TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+ notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+ stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+ stageabort(JobId, StageId, Attempt, NodeIdSet2),
+ NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+ stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+ startmessage_agg(JobId, StageId, Attempt, _, TSet),
+ stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+ jobstage(JobId, StageNumber, StageId),
+ TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, SSet),
+ notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, _),
+ attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+ notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+ jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file