changed the way to stop logFlusher thread.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index e378789..90d0d68 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -371,18 +371,17 @@
}
class LogFlusher extends Thread {
+ private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.COMMIT_LOG_SIZE, null);
private final LogManager logMgr;
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
private LogPage flushPage;
- private boolean stop;
public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage> flushQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
flushPage = null;
- stop = false;
}
public void terminate() {
@@ -392,8 +391,7 @@
flushPage.notify();
}
}
- stop = true;
- this.interrupt();
+ flushQ.offer(POISON_PILL);
}
@Override
@@ -402,16 +400,12 @@
flushPage = null;
try {
flushPage = flushQ.take();
+ if (flushPage == POISON_PILL) {
+ break;
+ }
flushPage.flush();
- if (stop) {
- break;
- }
} catch (InterruptedException e) {
- if (stop) {
- break;
- } else {
- throw new IllegalStateException(e);
- }
+ //ignore
}
emptyQ.offer(flushPage);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index d51a95b..45c3e65 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -138,11 +138,11 @@
LOGGER.info("flush()| appendOffset: " + appendOffset + ", flushOffset: " + flushOffset
+ ", full: " + full.get());
}
- this.wait();
if (stop) {
fileChannel.close();
break;
}
+ this.wait();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}