start WorkQueue at the end of CC startup
- should avoid issues like https://code.google.com/p/asterixdb/issues/detail?id=758

Change-Id: I209217ac1e923e7d2a22b6944c873236c62ef13b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/38
Reviewed-by: Zachary Heilbron <zheilbron@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Raman Grover <ramang@uci.edu>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8184fe6..b665a4f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -222,7 +222,6 @@
         clientIPC.start();
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
-        workQueue.start();
         info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
                 webServer.getListeningPort());
         timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
@@ -230,6 +229,7 @@
         startApplication();
 
         datasetDirectoryService.init(executor);
+        workQueue.start();
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 58e12cf..251ebb1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,31 +24,37 @@
 
 public class WorkQueue {
     private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
+    private static final Level COUNT_LOGGING_LEVEL = Level.FINEST;
 
     private final LinkedBlockingQueue<AbstractWork> queue;
     private final WorkerThread thread;
     private final Semaphore stopSemaphore;
     private boolean stopped;
-    private final AtomicInteger enqueueCount;
-    private final AtomicInteger dequeueCount;
+    private AtomicInteger enqueueCount;
+    private AtomicInteger dequeueCount;
 
     public WorkQueue() {
         queue = new LinkedBlockingQueue<AbstractWork>();
         thread = new WorkerThread();
         stopSemaphore = new Semaphore(1);
-        enqueueCount = new AtomicInteger();
-        dequeueCount = new AtomicInteger();
+        stopped = true;
+        if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+            enqueueCount = new AtomicInteger(0);
+            dequeueCount = new AtomicInteger(0);
+        }
     }
 
     public void start() throws HyracksException {
-        stopped = false;
-        enqueueCount.set(0);
-        dequeueCount.set(0);
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
             throw new HyracksException(e);
         }
+        if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+            enqueueCount.set(0);
+            dequeueCount.set(0);
+        }
+        stopped = false;
         thread.start();
     }
 
@@ -69,9 +75,8 @@
     }
 
     public void schedule(AbstractWork event) {
-        enqueueCount.incrementAndGet();
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Enqueue: " + enqueueCount);
+        if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+            LOGGER.log(COUNT_LOGGING_LEVEL, "Enqueue (" + hashCode() + "): " + enqueueCount.incrementAndGet());
         }
         if (LOGGER.isLoggable(Level.FINER)) {
             LOGGER.finer("Scheduling: " + event);
@@ -105,9 +110,10 @@
                     } catch (InterruptedException e) {
                         continue;
                     }
-                    dequeueCount.incrementAndGet();
-                    if (LOGGER.isLoggable(Level.FINEST)) {
-                        LOGGER.finest("Dequeue: " + dequeueCount + "/" + enqueueCount);
+                    if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+                        LOGGER.log(COUNT_LOGGING_LEVEL,
+                                "Dequeue (" + WorkQueue.this.hashCode() + "): " + dequeueCount.incrementAndGet() + "/"
+                                        + enqueueCount);
                     }
                     try {
                         if (LOGGER.isLoggable(r.logLevel())) {