Ensure LogManager Doesn't Exceed the Size of the Log Page Queues

Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1342
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 814bcfc..57d5c39 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -67,6 +67,7 @@
     private final MutableLong flushLSN;
     private LinkedBlockingQueue<LogBuffer> emptyQ;
     private LinkedBlockingQueue<LogBuffer> flushQ;
+    private LinkedBlockingQueue<LogBuffer> stashQ;
     protected final AtomicLong appendLSN;
     private FileChannel appendChannel;
     protected LogBuffer appendPage;
@@ -97,8 +98,9 @@
     }
 
     private void initializeLogManager(long nextLogFileId) {
-        emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
-        flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
+        emptyQ = new LinkedBlockingQueue<>(numLogPages);
+        flushQ = new LinkedBlockingQueue<>(numLogPages);
+        stashQ = new LinkedBlockingQueue<>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
         }
@@ -109,7 +111,7 @@
         }
         appendChannel = getFileChannel(appendLSN.get(), false);
         getAndInitNewPage(INITIAL_LOG_SIZE);
-        logFlusher = new LogFlusher(this, emptyQ, flushQ);
+        logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
         if (!flushLogsLogger.isAlive()) {
             txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
@@ -182,8 +184,18 @@
 
     protected void getAndInitNewPage(int logSize) {
         if (logSize > logPageSize) {
+            // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity
+            appendPage = null;
+            while (appendPage == null) {
+                try {
+                    appendPage = emptyQ.take();
+                    stashQ.add(appendPage);
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+            }
             // for now, alloc a new buffer for each large page
-            // TODO: pool large pages
+            // TODO: pool large pages??
             appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
             appendPage.setFileChannel(appendChannel);
             flushQ.offer(appendPage);
@@ -301,10 +313,6 @@
         }
     }
 
-    public MutableLong getFlushLSN() {
-        return flushLSN;
-    }
-
     private long initializeLogAnchor(long nextLogFileId) {
         long fileId = 0;
         long offset = 0;
@@ -440,7 +448,7 @@
                 }
             });
             if (logFileNames != null && logFileNames.length != 0) {
-                logFileIds = new ArrayList<Long>();
+                logFileIds = new ArrayList<>();
                 for (String fileName : logFileNames) {
                     logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
                 }
@@ -621,14 +629,17 @@
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<LogBuffer> emptyQ;
     private final LinkedBlockingQueue<LogBuffer> flushQ;
+    private final LinkedBlockingQueue<LogBuffer> stashQ;
     private LogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ) {
+    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ,
+            LinkedBlockingQueue<LogBuffer> stashQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
+        this.stashQ = stashQ;
         flushPage = null;
         isStarted = new AtomicBoolean(false);
         terminateFlag = new AtomicBoolean(false);
@@ -680,9 +691,7 @@
                     }
                 }
                 flushPage.flush();
-                if (flushPage.getLogPageSize() == logMgr.getLogPageSize()) {
-                    emptyQ.offer(flushPage);
-                }
+                emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
             }
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.INFO)) {