ASTERIXDB-1160: Break deadlock between LogManager and LogFlusher

This change introduces a new thread in LogManager to log FLUSH logs.
This will prevent LogFlusher thread from waiting on itself until FLUSH logs are flushed to disk.

Change-Id: I0b414f5ab92e1c68c8aafbaab859c644ba399dcb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/475
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-lang-common/pom.xml b/asterix-lang-common/pom.xml
index 974212b..1e22da0 100644
--- a/asterix-lang-common/pom.xml
+++ b/asterix-lang-common/pom.xml
@@ -43,8 +43,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                     <fork>true</fork>
                 </configuration>
             </plugin>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f14c146..87d0ad8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -70,8 +70,10 @@
     private LogFlusher logFlusher;
     private Future<Object> futureLogFlusher;
     private static final long SMALLEST_LOG_FILE_ID = 0;
+    private LinkedBlockingQueue<ILogRecord> flushLogsQ;
+    private final FlushLogsLogger flushLogsLogger;
 
-    public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+    public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
         logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
                 this.txnSubsystem.getId());
@@ -82,6 +84,8 @@
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
         appendLSN = new AtomicLong();
+        flushLogsQ = new LinkedBlockingQueue<>();
+        flushLogsLogger = new FlushLogsLogger();
         initializeLogManager(SMALLEST_LOG_FILE_ID);
     }
 
@@ -100,6 +104,9 @@
         getAndInitNewPage();
         logFlusher = new LogFlusher(this, emptyQ, flushQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
+        if (!flushLogsLogger.isAlive()) {
+            txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
+        }
     }
 
     @Override
@@ -108,7 +115,15 @@
             throw new IllegalStateException();
         }
 
-        syncLog(logRecord);
+        if (logRecord.getLogType() == LogType.FLUSH) {
+            flushLogsQ.offer(logRecord);
+            return;
+        }
+        appendToLogTail(logRecord);
+    }
+
+    private void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+        syncAppendToLogTail(logRecord);
 
         if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
                 && !logRecord.isFlushed()) {
@@ -124,16 +139,15 @@
         }
     }
 
-    private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
+    private synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
         ITransactionContext txnCtx = null;
 
         if (logRecord.getLogType() != LogType.FLUSH) {
             txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
-                throw new ACIDException("Aborted job(" + txnCtx.getJobId()
-                        + ") tried to write non-abort type log record.");
+                throw new ACIDException(
+                        "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
             }
-
         }
         if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
             prepareNextLogFile();
@@ -394,7 +408,7 @@
         return lsn / logFileSize;
     }
 
-    private boolean createFileIfNotExists(String path) throws IOException {
+    private static boolean createFileIfNotExists(String path) throws IOException {
         File file = new File(path);
         File parentFile = file.getParentFile();
         if (parentFile != null) {
@@ -403,7 +417,7 @@
         return file.createNewFile();
     }
 
-    private boolean createNewDirectory(String path) throws IOException {
+    private static boolean createNewDirectory(String path) {
         return (new File(path)).mkdir();
     }
 
@@ -435,10 +449,33 @@
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
             return logFileIds.get(0) * logFileSize;
-        }else{
+        } else {
             throw new IllegalStateException("Couldn't find any log files.");
         }
     }
+
+    /**
+     * This class is used to log FLUSH logs.
+     * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation
+     * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it.
+     */
+    private class FlushLogsLogger extends Thread {
+        private ILogRecord logRecord;
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    logRecord = flushLogsQ.take();
+                    appendToLogTail(logRecord);
+                } catch (ACIDException e) {
+                    e.printStackTrace();
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+            }
+        }
+    }
 }
 
 class LogFlusher implements Callable<Boolean> {