start WorkQueue at the end of CC startup
- should avoid issues like https://code.google.com/p/asterixdb/issues/detail?id=758
Change-Id: I209217ac1e923e7d2a22b6944c873236c62ef13b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/38
Reviewed-by: Zachary Heilbron <zheilbron@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Raman Grover <ramang@uci.edu>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8184fe6..b665a4f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -222,7 +222,6 @@
clientIPC.start();
webServer.setPort(ccConfig.httpPort);
webServer.start();
- workQueue.start();
info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
webServer.getListeningPort());
timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
@@ -230,6 +229,7 @@
startApplication();
datasetDirectoryService.init(executor);
+ workQueue.start();
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 58e12cf..251ebb1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,31 +24,37 @@
public class WorkQueue {
private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
+ private static final Level COUNT_LOGGING_LEVEL = Level.FINEST;
private final LinkedBlockingQueue<AbstractWork> queue;
private final WorkerThread thread;
private final Semaphore stopSemaphore;
private boolean stopped;
- private final AtomicInteger enqueueCount;
- private final AtomicInteger dequeueCount;
+ private AtomicInteger enqueueCount;
+ private AtomicInteger dequeueCount;
public WorkQueue() {
queue = new LinkedBlockingQueue<AbstractWork>();
thread = new WorkerThread();
stopSemaphore = new Semaphore(1);
- enqueueCount = new AtomicInteger();
- dequeueCount = new AtomicInteger();
+ stopped = true;
+ if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+ enqueueCount = new AtomicInteger(0);
+ dequeueCount = new AtomicInteger(0);
+ }
}
public void start() throws HyracksException {
- stopped = false;
- enqueueCount.set(0);
- dequeueCount.set(0);
try {
stopSemaphore.acquire();
} catch (InterruptedException e) {
throw new HyracksException(e);
}
+ if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+ enqueueCount.set(0);
+ dequeueCount.set(0);
+ }
+ stopped = false;
thread.start();
}
@@ -69,9 +75,8 @@
}
public void schedule(AbstractWork event) {
- enqueueCount.incrementAndGet();
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Enqueue: " + enqueueCount);
+ if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+ LOGGER.log(COUNT_LOGGING_LEVEL, "Enqueue (" + hashCode() + "): " + enqueueCount.incrementAndGet());
}
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Scheduling: " + event);
@@ -105,9 +110,10 @@
} catch (InterruptedException e) {
continue;
}
- dequeueCount.incrementAndGet();
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Dequeue: " + dequeueCount + "/" + enqueueCount);
+ if (LOGGER.isLoggable(COUNT_LOGGING_LEVEL)) {
+ LOGGER.log(COUNT_LOGGING_LEVEL,
+ "Dequeue (" + WorkQueue.this.hashCode() + "): " + dequeueCount.incrementAndGet() + "/"
+ + enqueueCount);
}
try {
if (LOGGER.isLoggable(r.logLevel())) {