ASTERIXDB-1160: Break deadlock between LogManager and LogFlusher
This change introduces a new thread in LogManager to log FLUSH logs.
This will prevent LogFlusher thread from waiting on itself until FLUSH logs are flushed to disk.
Change-Id: I0b414f5ab92e1c68c8aafbaab859c644ba399dcb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/475
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-lang-common/pom.xml b/asterix-lang-common/pom.xml
index 974212b..1e22da0 100644
--- a/asterix-lang-common/pom.xml
+++ b/asterix-lang-common/pom.xml
@@ -43,8 +43,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
<fork>true</fork>
</configuration>
</plugin>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f14c146..87d0ad8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -70,8 +70,10 @@
private LogFlusher logFlusher;
private Future<Object> futureLogFlusher;
private static final long SMALLEST_LOG_FILE_ID = 0;
+ private LinkedBlockingQueue<ILogRecord> flushLogsQ;
+ private final FlushLogsLogger flushLogsLogger;
- public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+ public LogManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
this.txnSubsystem.getId());
@@ -82,6 +84,8 @@
logFilePrefix = logManagerProperties.getLogFilePrefix();
flushLSN = new MutableLong();
appendLSN = new AtomicLong();
+ flushLogsQ = new LinkedBlockingQueue<>();
+ flushLogsLogger = new FlushLogsLogger();
initializeLogManager(SMALLEST_LOG_FILE_ID);
}
@@ -100,6 +104,9 @@
getAndInitNewPage();
logFlusher = new LogFlusher(this, emptyQ, flushQ);
futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
+ if (!flushLogsLogger.isAlive()) {
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
+ }
}
@Override
@@ -108,7 +115,15 @@
throw new IllegalStateException();
}
- syncLog(logRecord);
+ if (logRecord.getLogType() == LogType.FLUSH) {
+ flushLogsQ.offer(logRecord);
+ return;
+ }
+ appendToLogTail(logRecord);
+ }
+
+ private void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+ syncAppendToLogTail(logRecord);
if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
&& !logRecord.isFlushed()) {
@@ -124,16 +139,15 @@
}
}
- private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
+ private synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
ITransactionContext txnCtx = null;
if (logRecord.getLogType() != LogType.FLUSH) {
txnCtx = logRecord.getTxnCtx();
if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
- throw new ACIDException("Aborted job(" + txnCtx.getJobId()
- + ") tried to write non-abort type log record.");
+ throw new ACIDException(
+ "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
}
-
}
if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
prepareNextLogFile();
@@ -394,7 +408,7 @@
return lsn / logFileSize;
}
- private boolean createFileIfNotExists(String path) throws IOException {
+ private static boolean createFileIfNotExists(String path) throws IOException {
File file = new File(path);
File parentFile = file.getParentFile();
if (parentFile != null) {
@@ -403,7 +417,7 @@
return file.createNewFile();
}
- private boolean createNewDirectory(String path) throws IOException {
+ private static boolean createNewDirectory(String path) {
return (new File(path)).mkdir();
}
@@ -435,10 +449,33 @@
List<Long> logFileIds = getLogFileIds();
if (logFileIds != null) {
return logFileIds.get(0) * logFileSize;
- }else{
+ } else {
throw new IllegalStateException("Couldn't find any log files.");
}
}
+
+ /**
+ * This class is used to log FLUSH logs.
+ * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation
+ * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it.
+ */
+ private class FlushLogsLogger extends Thread {
+ private ILogRecord logRecord;
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ logRecord = flushLogsQ.take();
+ appendToLogTail(logRecord);
+ } catch (ACIDException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+ }
}
class LogFlusher implements Callable<Boolean> {