Ensure LogManager Doesn't Exceed the Size of the Log Page Queues
Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1342
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 814bcfc..57d5c39 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -67,6 +67,7 @@
private final MutableLong flushLSN;
private LinkedBlockingQueue<LogBuffer> emptyQ;
private LinkedBlockingQueue<LogBuffer> flushQ;
+ private LinkedBlockingQueue<LogBuffer> stashQ;
protected final AtomicLong appendLSN;
private FileChannel appendChannel;
protected LogBuffer appendPage;
@@ -97,8 +98,9 @@
}
private void initializeLogManager(long nextLogFileId) {
- emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
- flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
+ emptyQ = new LinkedBlockingQueue<>(numLogPages);
+ flushQ = new LinkedBlockingQueue<>(numLogPages);
+ stashQ = new LinkedBlockingQueue<>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
}
@@ -109,7 +111,7 @@
}
appendChannel = getFileChannel(appendLSN.get(), false);
getAndInitNewPage(INITIAL_LOG_SIZE);
- logFlusher = new LogFlusher(this, emptyQ, flushQ);
+ logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ);
futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
if (!flushLogsLogger.isAlive()) {
txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
@@ -182,8 +184,18 @@
protected void getAndInitNewPage(int logSize) {
if (logSize > logPageSize) {
+ // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity
+ appendPage = null;
+ while (appendPage == null) {
+ try {
+ appendPage = emptyQ.take();
+ stashQ.add(appendPage);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
// for now, alloc a new buffer for each large page
- // TODO: pool large pages
+ // TODO: pool large pages??
appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
appendPage.setFileChannel(appendChannel);
flushQ.offer(appendPage);
@@ -301,10 +313,6 @@
}
}
- public MutableLong getFlushLSN() {
- return flushLSN;
- }
-
private long initializeLogAnchor(long nextLogFileId) {
long fileId = 0;
long offset = 0;
@@ -440,7 +448,7 @@
}
});
if (logFileNames != null && logFileNames.length != 0) {
- logFileIds = new ArrayList<Long>();
+ logFileIds = new ArrayList<>();
for (String fileName : logFileNames) {
logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
}
@@ -621,14 +629,17 @@
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogBuffer> emptyQ;
private final LinkedBlockingQueue<LogBuffer> flushQ;
+ private final LinkedBlockingQueue<LogBuffer> stashQ;
private LogBuffer flushPage;
private final AtomicBoolean isStarted;
private final AtomicBoolean terminateFlag;
- public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ) {
+ public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ,
+ LinkedBlockingQueue<LogBuffer> stashQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
+ this.stashQ = stashQ;
flushPage = null;
isStarted = new AtomicBoolean(false);
terminateFlag = new AtomicBoolean(false);
@@ -680,9 +691,7 @@
}
}
flushPage.flush();
- if (flushPage.getLogPageSize() == logMgr.getLogPageSize()) {
- emptyQ.offer(flushPage);
- }
+ emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
}
} catch (Exception e) {
if (LOGGER.isLoggable(Level.INFO)) {