Refactored controllers. Added a JobManager implementation based on JOL
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@14 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/.classpath b/hyracks-core/.classpath
index 370f43c..2fb3f21 100644
--- a/hyracks-core/.classpath
+++ b/hyracks-core/.classpath
@@ -2,6 +2,7 @@
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="src" path="src/main/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-core/pom.xml b/hyracks-core/pom.xml
index 5f91b97..0222956 100644
--- a/hyracks-core/pom.xml
+++ b/hyracks-core/pom.xml
@@ -129,6 +129,13 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>jol</groupId>
+ <artifactId>jol</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index af86093..19e81f1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -17,7 +17,6 @@
import java.rmi.Remote;
import java.util.EnumSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -27,15 +26,15 @@
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public interface IClusterController extends Remote {
- public void registerNode(INodeController nodeController) throws Exception;
+ public NodeParameters registerNode(INodeController nodeController) throws Exception;
public void unregisterNode(INodeController nodeController) throws Exception;
- public INodeController lookupNode(String id) throws Exception;
-
public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
throws Exception;
+ public void nodeHeartbeat(String id) throws Exception;
+
/*
* Client Job Control methods.
*/
@@ -48,6 +47,6 @@
public void start(UUID jobId) throws Exception;
public JobStatistics waitForCompletion(UUID jobId) throws Exception;
-
- public Map<String,INodeController> getRegistry() throws Exception;
+
+ public Map<String, INodeController> getRegistry() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
new file mode 100644
index 0000000..3856fea
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.controller;
+
+import java.io.Serializable;
+
+public class NodeParameters implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int heartbeatPeriod;
+
+ public int getHeartbeatPeriod() {
+ return heartbeatPeriod;
+ }
+
+ public void setHeartbeatPeriod(int heartbeatPeriod) {
+ this.heartbeatPeriod = heartbeatPeriod;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index c7f3175..86d8118 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -46,4 +46,9 @@
ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
return id.equals(other.id);
}
+
+ @Override
+ public String toString() {
+ return "CDID:" + id;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
new file mode 100644
index 0000000..c884998
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public enum Direction {
+ INPUT,
+ OUTPUT,
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
index d8057d9..844266d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
@@ -17,11 +17,6 @@
import java.io.Serializable;
public final class PortInstanceId implements Serializable {
- public enum Direction {
- INPUT,
- OUTPUT,
- }
-
private static final long serialVersionUID = 1L;
private OperatorDescriptorId odId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
new file mode 100644
index 0000000..2d76c26
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.Serializable;
+
+public interface IResource extends Serializable {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
new file mode 100644
index 0000000..3a6f930
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ISpaceSharedResource {
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
new file mode 100644
index 0000000..1fb0465
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ITimeSharedResource {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 950569d..081c3f2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -22,4 +22,13 @@
@Option(name = "-http-port", usage = "Sets the http port for the admin console")
public int httpPort;
+
+ @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in seconds (default: 10)")
+ public int heartbeatPeriod = 10;
+
+ @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+ public int maxHeartbeatLapsePeriods = 5;
+
+ @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+ public boolean useJOL = false;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
similarity index 67%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 0461499..9b2fc26 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -12,15 +12,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.controller;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.io.IOException;
import java.io.PrintWriter;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -31,6 +36,8 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import jol.core.Runtime;
+
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -38,13 +45,14 @@
import edu.uci.ics.hyracks.api.controller.IClusterController;
import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.job.JobManager;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
import edu.uci.ics.hyracks.web.WebServer;
public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
@@ -54,20 +62,32 @@
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
- private final Map<String, INodeController> nodeRegistry;
+ private final Map<String, NodeControllerState> nodeRegistry;
private WebServer webServer;
- private final JobManager jobManager;
+ private final IJobManager jobManager;
private final Executor taskExecutor;
+ private final Timer timer;
+
+ private Runtime jolRuntime;
+
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
- nodeRegistry = new LinkedHashMap<String, INodeController>();
- jobManager = new JobManager(this);
+ nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ if (ccConfig.useJOL) {
+ jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_WATCH, System.err);
+ jobManager = new JOLJobManagerImpl(jolRuntime);
+ } else {
+ jobManager = new JobManagerImpl(this);
+ }
taskExecutor = Executors.newCachedThreadPool();
- webServer = new WebServer(new Handler[] { getAdminConsoleHandler() });
+ webServer = new WebServer(new Handler[] {
+ getAdminConsoleHandler()
+ });
+ this.timer = new Timer(true);
}
@Override
@@ -77,6 +97,7 @@
registry.rebind(IClusterController.class.getName(), this);
webServer.setPort(ccConfig.httpPort);
webServer.start();
+ timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod * 1000);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
@@ -98,16 +119,20 @@
}
@Override
- public void registerNode(INodeController nodeController) throws Exception {
+ public NodeParameters registerNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
+ NodeControllerState state = new NodeControllerState(nodeController);
synchronized (this) {
if (nodeRegistry.containsKey(id)) {
throw new Exception("Node with this name already registered.");
}
- nodeRegistry.put(id, nodeController);
+ nodeRegistry.put(id, state);
}
nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+ NodeParameters params = new NodeParameters();
+ params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+ return params;
}
@Override
@@ -119,8 +144,7 @@
LOGGER.log(Level.INFO, "Unregistered INodeController");
}
- @Override
- public INodeController lookupNode(String id) throws Exception {
+ public synchronized NodeControllerState lookupNode(String id) throws Exception {
return nodeRegistry.get(id);
}
@@ -129,12 +153,12 @@
}
public synchronized void notifyJobComplete(final UUID jobId) {
- for (final INodeController nc : nodeRegistry.values()) {
+ for (final NodeControllerState ns : nodeRegistry.values()) {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
- nc.cleanUpJob(jobId);
+ ns.getNodeController().cleanUpJob(jobId);
} catch (Exception e) {
}
}
@@ -145,7 +169,7 @@
@Override
public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception {
+ throws Exception {
jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
}
@@ -169,7 +193,7 @@
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
+ HttpServletResponse response) throws IOException, ServletException {
if (!"/".equals(target)) {
return;
}
@@ -182,12 +206,12 @@
writer.println("<h2>Node Controllers</h2>");
writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
synchronized (ClusterControllerService.this) {
- for (Map.Entry<String, INodeController> e : nodeRegistry.entrySet()) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
try {
- INodeController n = e.getValue();
writer.print("<tr><td>");
writer.print(e.getKey());
writer.print("</td><td>");
+ writer.print(e.getValue().getLastHeartbeatDuration());
writer.print("</td></tr>");
} catch (Exception ex) {
}
@@ -203,6 +227,44 @@
@Override
public Map<String, INodeController> getRegistry() throws Exception {
- return nodeRegistry;
+ Map<String, INodeController> map = new HashMap<String, INodeController>();
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ map.put(e.getKey(), e.getValue().getNodeController());
+ }
+ return map;
+ }
+
+ @Override
+ public synchronized void nodeHeartbeat(String id) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Heartbeat from: " + id);
+ }
+ NodeControllerState ns = nodeRegistry.get(id);
+ if (ns != null) {
+ ns.notifyHeartbeat();
+ }
+ }
+
+ private void killNode(String nodeId) {
+ nodeRegistry.remove(nodeId);
+ jobManager.notifyNodeFailure(nodeId);
+ }
+
+ private class DeadNodeSweeper extends TimerTask {
+ @Override
+ public void run() {
+ Set<String> deadNodes = new HashSet<String>();
+ synchronized (ClusterControllerService.this) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ NodeControllerState state = e.getValue();
+ if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+ deadNodes.add(e.getKey());
+ }
+ }
+ for (String deadNode : deadNodes) {
+ killNode(deadNode);
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
new file mode 100644
index 0000000..7892b5c
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public void start(UUID jobId) throws Exception;
+
+ public void advanceJob(JobControl jobControlImpl) throws Exception;
+
+ public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
+ throws Exception;
+
+ public JobStatus getJobStatus(UUID jobId);
+
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+ public void notifyNodeFailure(String nodeId);
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
new file mode 100644
index 0000000..e9bffba
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.job.JobStage;
+
+public interface IJobPlanner {
+ public Set<String> plan(JobControl jobControl, JobStage stage) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
new file mode 100644
index 0000000..6e7cf74
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.UUID;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.table.BasicTable;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class JOLJobManagerImpl implements IJobManager {
+ public static final String JOL_SCOPE = "hyrackscc";
+
+ private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+
+ private final Runtime jolRuntime;
+
+ private final JobTable jobTable;
+
+ private final OperatorDescriptorTable odTable;
+
+ private final ConnectorDescriptorTable cdTable;
+
+ private final ActivityNodeTable anTable;
+
+ private final ActivityConnectionTable acTable;
+
+ private final ActivityBlockedTable abTable;
+
+ public JOLJobManagerImpl(Runtime jolRuntime) throws Exception {
+ this.jolRuntime = jolRuntime;
+ this.jobTable = new JobTable(jolRuntime);
+ this.odTable = new OperatorDescriptorTable(jolRuntime);
+ this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+ this.anTable = new ActivityNodeTable(jolRuntime);
+ this.acTable = new ActivityConnectionTable(jolRuntime);
+ this.abTable = new ActivityBlockedTable(jolRuntime);
+
+ jolRuntime.catalog().register(jobTable);
+ jolRuntime.catalog().register(odTable);
+ jolRuntime.catalog().register(cdTable);
+ jolRuntime.catalog().register(anTable);
+ jolRuntime.catalog().register(acTable);
+ jolRuntime.catalog().register(abTable);
+
+ jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public void advanceJob(JobControl jobControlImpl) throws Exception {
+
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final UUID jobId = UUID.randomUUID();
+
+ BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobFlags));
+
+ final BasicTupleSet anTuples = new BasicTupleSet();
+ final BasicTupleSet acTuples = new BasicTupleSet();
+ final BasicTupleSet abTuples = new BasicTupleSet();
+ IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+ @Override
+ public void addTask(IActivityNode task) {
+ anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+ taskOutputIndex));
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+ taskInputIndex));
+ }
+
+ @Override
+ public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+ abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+ }
+ };
+
+ BasicTupleSet odTuples = new BasicTupleSet();
+ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+ IOperatorDescriptor od = e.getValue();
+ odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
+ od.contributeTaskGraph(gBuilder);
+ }
+
+ BasicTupleSet cdTuples = new BasicTupleSet();
+ for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+ cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+ }
+
+ jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+ jolRuntime.evaluate();
+
+ return jobId;
+ }
+
+ @Override
+ public JobStatus getJobStatus(UUID jobId) {
+ return null;
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) {
+
+ }
+
+ @Override
+ public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
+ throws Exception {
+ }
+
+ @Override
+ public void start(UUID jobId) throws Exception {
+
+ }
+
+ @Override
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ return null;
+ }
+
+ /*
+ * declare(job, keys(0), {JobId, Flags, Status})
+ */
+ private static class JobTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, EnumSet.class, JobStatus.class, PerJobCounter.class
+ };
+
+ public JobTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createInitialJobTuple(UUID jobId, EnumSet<JobFlag> jobFlags) {
+ return new Tuple(jobId, jobFlags, JobStatus.INITIALIZED, new PerJobCounter());
+ }
+ }
+
+ /*
+ * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+ */
+ private static class OperatorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
+ };
+
+ public OperatorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IOperatorDescriptor od) {
+ return new Tuple(jobId, od.getOperatorId(), od);
+ }
+ }
+
+ /*
+ * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+ */
+ private static class ConnectorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class,
+ ConnectorDescriptorId.class,
+ OperatorDescriptorId.class,
+ Integer.class,
+ OperatorDescriptorId.class,
+ Integer.class,
+ IConnectorDescriptor.class
+ };
+
+ public ConnectorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+ IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+ int srcPort = jobSpec.getProducerOutputIndex(conn);
+ IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+ int destPort = jobSpec.getConsumerInputIndex(conn);
+ Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
+ .getOperatorId(), destPort, conn);
+ return cdTuple;
+ }
+ }
+
+ /*
+ * declare(activitynode, keys(0), {JobId, OperatorId, ActivityId, ActivityNode})
+ */
+ private static class ActivityNodeTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
+ };
+
+ public ActivityNodeTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+ aNode);
+ }
+ }
+
+ /*
+ * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+ */
+ private static class ActivityConnectionTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
+ };
+
+ public ActivityConnectionTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
+ .getActivityNodeId(), activityPort);
+ }
+ }
+
+ /*
+ * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+ */
+ private static class ActivityBlockedTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, ActivityNodeId.class
+ };
+
+ public ActivityBlockedTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+ OperatorDescriptorId odId = blocker.getActivityNodeId().getOperatorDescriptorId();
+ return new Tuple(jobId, odId, blocker.getActivityNodeId(), blocked.getActivityNodeId());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
similarity index 94%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
index 3259649..a7e2fc1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.rmi.RemoteException;
import java.util.HashMap;
@@ -30,7 +30,7 @@
public class JobControl {
private static final long serialVersionUID = 1L;
- private final JobManager jobManager;
+ private final IJobManager jobManager;
private final JobPlan jobPlan;
@@ -44,7 +44,7 @@
private JobStatistics jobStatistics;
- public JobControl(JobManager jobManager, JobPlan jobPlan) throws RemoteException {
+ public JobControl(IJobManager jobManager, JobPlan jobPlan) throws RemoteException {
this.jobManager = jobManager;
this.jobPlan = jobPlan;
jobId = UUID.randomUUID();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
similarity index 79%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index f976bb9..828a6db 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.ArrayList;
import java.util.Date;
@@ -29,13 +29,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -44,19 +38,19 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-public class JobManager {
- private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
+public class JobManagerImpl implements IJobManager {
+ private static final Logger LOGGER = Logger.getLogger(JobManagerImpl.class.getName());
private ClusterControllerService ccs;
private final Map<UUID, JobControl> jobMap;
- public JobManager(ClusterControllerService ccs) {
+ private final IJobPlanner planner;
+
+ public JobManagerImpl(ClusterControllerService ccs) {
this.ccs = ccs;
jobMap = new HashMap<UUID, JobControl>();
+ planner = new NaiveJobPlannerImpl();
}
public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
@@ -119,7 +113,7 @@
stage.setStarted();
Set<String> candidateNodes = deploy(jc, stage);
for (String nodeId : candidateNodes) {
- ccs.lookupNode(nodeId).startStage(jc.getJobId(), stage.getId());
+ ccs.lookupNode(nodeId).getNodeController().startStage(jc.getJobId(), stage.getId());
}
}
@@ -148,15 +142,15 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deploying: " + stage);
}
- Set<String> candidateNodes = plan(jc, stage);
+ Set<String> participatingNodes = plan(jc, stage);
StageProgress stageProgress = new StageProgress(stage.getId());
- stageProgress.addPendingNodes(candidateNodes);
+ stageProgress.addPendingNodes(participatingNodes);
Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
- new PortMapMergingAccumulator(), candidateNodes);
- runRemote(new Phase2Installer(jc, stage, globalPortMap), null, candidateNodes);
- runRemote(new Phase3Installer(jc, stage), null, candidateNodes);
+ new PortMapMergingAccumulator(), participatingNodes);
+ runRemote(new Phase2Installer(jc, stage, globalPortMap), null, participatingNodes);
+ runRemote(new Phase3Installer(jc, stage), null, participatingNodes);
jc.setStageProgress(stage.getId(), stageProgress);
- return candidateNodes;
+ return participatingNodes;
}
private interface RemoteOp<T> {
@@ -255,7 +249,7 @@
final Semaphore installComplete = new Semaphore(candidateNodes.size());
final List<Exception> errors = new Vector<Exception>();
for (final String nodeId : candidateNodes) {
- final INodeController node = ccs.lookupNode(nodeId);
+ final INodeController node = ccs.lookupNode(nodeId).getNodeController();
installComplete.acquire();
Runnable remoteRunner = new Runnable() {
@@ -287,43 +281,11 @@
private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
-
- final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
- for (ActivityNodeId t : stage.getTasks()) {
- opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
- }
-
- final Set<String> candidateNodes = new HashSet<String>();
-
- IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) throws Exception {
- if (!opSet.contains(op.getOperatorId())) {
- return;
- }
- String[] partitions = op.getPartitions();
- if (partitions == null) {
- PartitionConstraint pc = op.getPartitionConstraint();
- LocationConstraint[] lcs = pc.getLocationConstraints();
- String[] assignment = new String[lcs.length];
- for (int i = 0; i < lcs.length; ++i) {
- String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
- assignment[i] = nodeId;
- }
- op.setPartitions(assignment);
- partitions = assignment;
- }
- for (String p : partitions) {
- candidateNodes.add(p);
- }
- }
- };
-
- PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
+ Set<String> participatingNodes = planner.plan(jc, stage);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(stage + " Candidate nodes: " + candidateNodes);
+ LOGGER.info(stage + " Participating nodes: " + participatingNodes);
}
- return candidateNodes;
+ return participatingNodes;
}
public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
@@ -349,4 +311,11 @@
}
return null;
}
+
+ public synchronized void notifyNodeFailure(String nodeId) {
+ for(Map.Entry<UUID, JobControl> e : jobMap.entrySet()) {
+ JobControl jc = e.getValue();
+
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
similarity index 99%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
index 61cf70f..723736c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.ArrayList;
import java.util.EnumSet;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
new file mode 100644
index 0000000..f3ccfee
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobStage;
+import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
+
+public class NaiveJobPlannerImpl implements IJobPlanner {
+ @Override
+ public Set<String> plan(JobControl jc, JobStage stage) throws Exception {
+ final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
+ for (ActivityNodeId t : stage.getTasks()) {
+ opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
+ }
+
+ final Set<String> candidateNodes = new HashSet<String>();
+
+ IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) throws Exception {
+ if (!opSet.contains(op.getOperatorId())) {
+ return;
+ }
+ String[] partitions = op.getPartitions();
+ if (partitions == null) {
+ PartitionConstraint pc = op.getPartitionConstraint();
+ LocationConstraint[] lcs = pc.getLocationConstraints();
+ String[] assignment = new String[lcs.length];
+ for (int i = 0; i < lcs.length; ++i) {
+ String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
+ assignment[i] = nodeId;
+ }
+ op.setPartitions(assignment);
+ partitions = assignment;
+ }
+ for (String p : partitions) {
+ candidateNodes.add(p);
+ }
+ }
+ };
+
+ PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
+ return candidateNodes;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
new file mode 100644
index 0000000..e209515
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import edu.uci.ics.hyracks.api.controller.INodeController;
+
+public class NodeControllerState {
+ private final INodeController nodeController;
+
+ private int lastHeartbeatDuration;
+
+ public NodeControllerState(INodeController nodeController) {
+ this.nodeController = nodeController;
+ }
+
+ void notifyHeartbeat() {
+ lastHeartbeatDuration = 0;
+ }
+
+ int incrementLastHeartbeatDuration() {
+ return lastHeartbeatDuration++;
+ }
+
+ int getLastHeartbeatDuration() {
+ return lastHeartbeatDuration;
+ }
+
+ public INodeController getNodeController() {
+ return nodeController;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
new file mode 100644
index 0000000..4644daf
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+public class PerJobCounter {
+ private int stageCounter;
+
+ public PerJobCounter() {
+ }
+
+ public int getStageCounterAndIncrement() {
+ return stageCounter++;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
index 3357012..69f7380 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.HashSet;
import java.util.Set;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 5f38e56..055216d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
import java.util.HashMap;
import java.util.Map;
@@ -23,7 +23,6 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
public class Joblet {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
similarity index 92%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index dbdf1c9..f8f6806 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.controller;
+package edu.uci.ics.hyracks.controller.nodecontroller;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -38,7 +40,9 @@
import edu.uci.ics.hyracks.api.controller.IClusterController;
import edu.uci.ics.hyracks.api.controller.INodeController;
import edu.uci.ics.hyracks.api.controller.NodeCapability;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
@@ -56,8 +60,7 @@
import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
import edu.uci.ics.hyracks.config.NCConfig;
import edu.uci.ics.hyracks.context.HyracksContext;
-import edu.uci.ics.hyracks.job.Joblet;
-import edu.uci.ics.hyracks.job.Stagelet;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
import edu.uci.ics.hyracks.runtime.OperatorRunnable;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -73,12 +76,16 @@
private final ConnectionManager connectionManager;
+ private final Timer timer;
+
private IClusterController ccs;
private Map<UUID, Joblet> jobletMap;
private Executor executor;
+ private NodeParameters nodeParameters;
+
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
@@ -90,6 +97,7 @@
connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
jobletMap = new HashMap<UUID, Joblet>();
executor = Executors.newCachedThreadPool();
+ timer = new Timer(true);
}
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -100,7 +108,11 @@
connectionManager.start();
Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
- cc.registerNode(this);
+ this.nodeParameters = cc.registerNode(this);
+
+ // Schedule heartbeat generator.
+ timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod() * 1000);
+
LOGGER.log(Level.INFO, "Started NodeControllerService");
}
@@ -189,7 +201,7 @@
DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
- PortInstanceId.Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
+ Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
}
@@ -270,7 +282,7 @@
@Override
public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- PortInstanceId.Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
Endpoint ep = globalPortMap.get(piId);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
@@ -367,8 +379,25 @@
this.ccs = ccs;
}
- @Override
- public NCConfig getConfiguration() throws Exception {
- return ncConfig;
- }
+ @Override
+ public NCConfig getConfiguration() throws Exception {
+ return ncConfig;
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ cc.nodeHeartbeat(id);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
similarity index 98%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e82fa5b..8acab28 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
import java.rmi.RemoteException;
import java.util.Date;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
index 425f967..bb1ba84 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
@@ -17,7 +17,7 @@
import org.kohsuke.args4j.CmdLineParser;
import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
public class CCDriver {
public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
index 657b304..3c856ab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
@@ -19,7 +19,7 @@
import edu.uci.ics.dcache.client.DCacheClient;
import edu.uci.ics.dcache.client.DCacheClientConfig;
import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
public class NCDriver {
public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
new file mode 100644
index 0000000..abba642
--- /dev/null
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -0,0 +1,50 @@
+program hyrackscc;
+
+import java.util.UUID;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+define(testOp, keys(0, 1), {UUID, OperatorDescriptorId, IOperatorDescriptor});
+define(testConn, keys(0, 1), {UUID, ConnectorDescriptorId, OperatorDescriptorId, Integer, OperatorDescriptorId, Integer});
+
+testOp(JobId, Opid, Op) :-
+ operatordescriptor(JobId, Opid, Op)
+ {
+ java.lang.System.err.println(Opid);
+ };
+
+testConn(JobId, Cid, SOpid, SPort, DOpid, DPort) :-
+ connectordescriptor(JobId, Cid, SOpid, SPort, DOpid, DPort, _)
+ {
+ java.lang.System.err.println(Cid.toString() + " " + SOpid.toString() + ":" + SPort.toString() + " -> " + DOpid.toString() + ":" + DPort.toString());
+ };
+
+define(activitystage, keys(0, 1, 2, 3, 4), {UUID, OperatorDescriptorId, ActivityNodeId, OperatorDescriptorId, ActivityNodeId});
+
+activitystage(JobId, OperatorId, ActivityId, OperatorId, ActivityId) :-
+ activitynode(JobId, OperatorId, ActivityId, _)
+ {
+ java.lang.System.err.println("INITIAL: " + JobId.toString() + " " + OperatorId.toString() + " " + ActivityId.toString());
+ };
+
+activitystage(JobId, OperatorId2, ActivityId2, OperatorId1, ActivityId1) :-
+ activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2)
+ {
+ java.lang.System.err.println("CHANGE1: " + JobId.toString() + " " + OperatorId2.toString() + " " + ActivityId2.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString());
+ };
+
+activitystage(JobId, OperatorId1, ActivityId1, OperatorId3, ActivityId3) :-
+ activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+ activitynode(JobId, OperatorId3, ActivityId3, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+ activityconnection(JobId, OperatorId3, Operator3Port, _, ActivityId3, _),
+ connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId3, Operator3Port, _)
+ {
+ java.lang.System.err.println("CHANGE2: " + JobId.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString() + " " + OperatorId3.toString() + " " + ActivityId3.toString());
+ };
+
+watch(activitynode, a);
+watch(activitystage, a);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 94bc57a..8505531 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -24,8 +24,8 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.config.CCConfig;
import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
public abstract class AbstractIntegrationTest {
public static final String NC1_ID = "nc1";
@@ -39,6 +39,7 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.port = 39001;
+ ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();