Added start/stop calls to WorkQueue
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@689 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5e4121e..57714aa 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -105,6 +105,8 @@
private final ICCContext ccContext;
+ private final DeadNodeSweeper sweeper;
+
private long jobCounter;
public ClusterControllerService(CCConfig ccConfig) throws Exception {
@@ -126,6 +128,7 @@
return ipAddressNodeNameMap;
}
};
+ sweeper = new DeadNodeSweeper();
jobCounter = 0;
}
@@ -137,9 +140,10 @@
registry.rebind(IClusterController.class.getName(), this);
webServer.setPort(ccConfig.httpPort);
webServer.start();
+ workQueue.start();
info = new ClusterControllerInfo();
info.setWebPort(webServer.getListeningPort());
- timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+ timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
@@ -147,6 +151,8 @@
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
webServer.stop();
+ sweeper.cancel();
+ workQueue.stop();
LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 136cd5b..9fe54a0 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -15,20 +15,51 @@
package edu.uci.ics.hyracks.control.common.work;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
public class WorkQueue {
private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
private final LinkedBlockingQueue<AbstractWork> queue;
private final WorkerThread thread;
+ private final Semaphore stopSemaphore;
+ private boolean stopped;
public WorkQueue() {
queue = new LinkedBlockingQueue<AbstractWork>();
thread = new WorkerThread();
+ stopSemaphore = new Semaphore(1);
+ }
+
+ public void start() throws HyracksException {
+ stopped = false;
+ try {
+ stopSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
thread.start();
}
+ public void stop() throws HyracksException {
+ synchronized (this) {
+ stopped = true;
+ }
+ schedule(new AbstractWork() {
+ @Override
+ public void run() {
+ }
+ });
+ try {
+ stopSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
+ }
+
public void schedule(AbstractWork event) {
if (LOGGER.isLoggable(event.logLevel())) {
LOGGER.info("Scheduling: " + event);
@@ -48,18 +79,27 @@
@Override
public void run() {
- Runnable r;
- while (true) {
- try {
- r = queue.take();
- } catch (InterruptedException e) {
- continue;
+ try {
+ Runnable r;
+ while (true) {
+ synchronized (WorkQueue.this) {
+ if (stopped) {
+ return;
+ }
+ }
+ try {
+ r = queue.take();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ try {
+ r.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- try {
- r.run();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ } finally {
+ stopSemaphore.release();
}
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 9a9aee2..a59b17e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,6 +99,8 @@
private NodeParameters nodeParameters;
+ private HeartbeatTask heartbeatTask;
+
private final ServerContext serverCtx;
private final Map<String, NCApplicationContext> applications;
@@ -158,9 +160,12 @@
this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
.getAvailableProcessors()));
+ queue.start();
+
+ heartbeatTask = new HeartbeatTask(cc);
// Schedule heartbeat generator.
- timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+ timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
if (nodeParameters.getProfileDumpPeriod() > 0) {
// Schedule profile dump generator.
@@ -174,7 +179,9 @@
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
partitionManager.close();
+ heartbeatTask.cancel();
connectionManager.stop();
+ queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}