Added start/stop calls to WorkQueue

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@689 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5e4121e..57714aa 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -105,6 +105,8 @@
 
     private final ICCContext ccContext;
 
+    private final DeadNodeSweeper sweeper;
+
     private long jobCounter;
 
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
@@ -126,6 +128,7 @@
                 return ipAddressNodeNameMap;
             }
         };
+        sweeper = new DeadNodeSweeper();
         jobCounter = 0;
     }
 
@@ -137,9 +140,10 @@
         registry.rebind(IClusterController.class.getName(), this);
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
+        workQueue.start();
         info = new ClusterControllerInfo();
         info.setWebPort(webServer.getListeningPort());
-        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+        timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
@@ -147,6 +151,8 @@
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
         webServer.stop();
+        sweeper.cancel();
+        workQueue.stop();
         LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
     }
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 136cd5b..9fe54a0 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -15,20 +15,51 @@
 package edu.uci.ics.hyracks.control.common.work;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
 public class WorkQueue {
     private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
 
     private final LinkedBlockingQueue<AbstractWork> queue;
     private final WorkerThread thread;
+    private final Semaphore stopSemaphore;
+    private boolean stopped;
 
     public WorkQueue() {
         queue = new LinkedBlockingQueue<AbstractWork>();
         thread = new WorkerThread();
+        stopSemaphore = new Semaphore(1);
+    }
+
+    public void start() throws HyracksException {
+        stopped = false;
+        try {
+            stopSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new HyracksException(e);
+        }
         thread.start();
     }
 
+    public void stop() throws HyracksException {
+        synchronized (this) {
+            stopped = true;
+        }
+        schedule(new AbstractWork() {
+            @Override
+            public void run() {
+            }
+        });
+        try {
+            stopSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new HyracksException(e);
+        }
+    }
+
     public void schedule(AbstractWork event) {
         if (LOGGER.isLoggable(event.logLevel())) {
             LOGGER.info("Scheduling: " + event);
@@ -48,18 +79,27 @@
 
         @Override
         public void run() {
-            Runnable r;
-            while (true) {
-                try {
-                    r = queue.take();
-                } catch (InterruptedException e) {
-                    continue;
+            try {
+                Runnable r;
+                while (true) {
+                    synchronized (WorkQueue.this) {
+                        if (stopped) {
+                            return;
+                        }
+                    }
+                    try {
+                        r = queue.take();
+                    } catch (InterruptedException e) {
+                        continue;
+                    }
+                    try {
+                        r.run();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
                 }
-                try {
-                    r.run();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+            } finally {
+                stopSemaphore.release();
             }
         }
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 9a9aee2..a59b17e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,6 +99,8 @@
 
     private NodeParameters nodeParameters;
 
+    private HeartbeatTask heartbeatTask;
+
     private final ServerContext serverCtx;
 
     private final Map<String, NCApplicationContext> applications;
@@ -158,9 +160,12 @@
         this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
                 .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
                 .getAvailableProcessors()));
+        queue.start();
+
+        heartbeatTask = new HeartbeatTask(cc);
 
         // Schedule heartbeat generator.
-        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
         if (nodeParameters.getProfileDumpPeriod() > 0) {
             // Schedule profile dump generator.
@@ -174,7 +179,9 @@
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
         partitionManager.close();
+        heartbeatTask.cancel();
         connectionManager.stop();
+        queue.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }