changes to fix issue 620
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index 14975ff..edd4b2a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -14,12 +14,15 @@
*/
package edu.uci.ics.asterix.common.api;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class AsterixThreadExecutor implements Executor {
public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
- private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+ private final ExecutorService executorService = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
private AsterixThreadExecutor() {
@@ -27,6 +30,10 @@
@Override
public void execute(Runnable command) {
- executor.execute(command);
+ executorService.execute(command);
+ }
+
+ public Future<Object> submit(Callable command) {
+ return (Future<Object>) executorService.submit(command);
}
}
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index deeb5b0..084a3f8 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -126,21 +126,6 @@
</property>
<property>
- <name>txn.log.disksectorsize</name>
- <value>4096</value>
- <description>The size of a disk sector. (Default = "4096")
- </description>
- </property>
-
- <property>
- <name>txn.log.groupcommitinterval</name>
- <value>40</value>
- <description>The group commit wait time in milliseconds. (Default =
- "40" // 40ms)
- </description>
- </property>
-
- <property>
<name>txn.log.checkpoint.lsnthreshold</name>
<value>67108864</value>
<description>The size of the window that the maximum LSN is allowed to
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index ad9ff2a..4f0bb59 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -24,7 +24,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -58,6 +62,7 @@
private FileChannel appendChannel;
private LogPage appendPage;
private LogFlusher logFlusher;
+ private Future<Object> futureLogFlusher;
public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -86,8 +91,7 @@
appendChannel = getFileChannel(appendLSN, false);
getAndInitNewPage();
logFlusher = new LogFlusher(this, emptyQ, flushQ);
- logFlusher.setDaemon(true);
- AsterixThreadExecutor.INSTANCE.execute(logFlusher);
+ futureLogFlusher = AsterixThreadExecutor.INSTANCE.submit(logFlusher);
}
@Override
@@ -174,6 +178,7 @@
@Override
public void stop(boolean dumpState, OutputStream os) {
+ terminateLogFlusher();
if (dumpState) {
// #. dump Configurable Variables
dumpConfVars(os);
@@ -267,15 +272,32 @@
}
private void terminateLogFlusher() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Terminating LogFlusher thread ...");
+ }
logFlusher.terminate();
try {
- logFlusher.join();
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
+ futureLogFlusher.get();
+ } catch (ExecutionException | InterruptedException e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("---------- warning(begin): LogFlusher thread is terminated abnormally --------");
+ e.printStackTrace();
+ LOGGER.info("---------- warning(end) : LogFlusher thread is terminated abnormally --------");
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("LogFlusher thread is terminated.");
}
}
private void deleteAllLogFiles() {
+ if (appendChannel != null) {
+ try {
+ appendChannel.close();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to close a fileChannel of a log file");
+ }
+ }
List<Long> logFileIds = getLogFileIds();
for (Long id : logFileIds) {
File file = new File(getLogFilePath(id));
@@ -364,43 +386,69 @@
}
}
-class LogFlusher extends Thread {
+class LogFlusher implements Callable<Boolean> {
private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
private LogPage flushPage;
+ private final AtomicBoolean isStarted;
+ private final AtomicBoolean terminateFlag;
public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage> flushQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
flushPage = null;
+ isStarted = new AtomicBoolean(false);
+ terminateFlag = new AtomicBoolean(false);
+
}
public void terminate() {
+ //make sure the LogFlusher thread started before terminating it.
+ synchronized (isStarted) {
+ while(!isStarted.get()) {
+ try {
+ isStarted.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+
+ terminateFlag.set(true);
if (flushPage != null) {
synchronized (flushPage) {
flushPage.isStop(true);
flushPage.notify();
}
}
+ //[Notice]
+ //The return value doesn't need to be checked
+ //since terminateFlag will trigger termination if the flushQ is full.
flushQ.offer(POISON_PILL);
}
@Override
- public void run() {
+ public Boolean call() {
+ synchronized(isStarted) {
+ isStarted.set(true);
+ isStarted.notify();
+ }
while (true) {
flushPage = null;
try {
flushPage = flushQ.take();
- if (flushPage == POISON_PILL) {
- break;
+ if (flushPage == POISON_PILL || terminateFlag.get()) {
+ return true;
}
- flushPage.flush();
} catch (InterruptedException e) {
- //ignore
+ if (flushPage == null) {
+ continue;
+ }
}
+ flushPage.flush();
emptyQ.offer(flushPage);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index b954c6e..edfec69 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -144,7 +144,7 @@
}
this.wait();
} catch (InterruptedException e) {
- throw new IllegalStateException(e);
+ continue;
}
}
endOffset = appendOffset;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index a83604c..01b38c2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -59,6 +59,7 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(msg);
}
+ ae.printStackTrace();
throw new ACIDException(msg, ae);
} finally {
txnSubsystem.getLockManager().releaseLocks(txnCtx);