[NO ISSUE][*DB] LogFlusher fixes
Change-Id: I19e150f2560573738938967f389a397ad7150a4d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2106
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 8e67603..6bdce73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -34,8 +34,9 @@
/**
* flush content of buffer to disk
+ * @param stopping
*/
- void flush();
+ void flush(boolean stopping);
/**
* @param logSize
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
new file mode 100644
index 0000000..4c65c66
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+public class InterruptUtil {
+ /**
+ * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
+ * completes, the current thread will be re-interrupted, if the original operation was interrupted.
+ */
+ public static void doUninterruptibly(Interruptible interruptible) {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ interruptible.run();
+ break;
+ } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
+ * completes, the current thread will be re-interrupted, if the original operation was interrupted.
+ */
+ public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ interruptible.run();
+ break;
+ } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Executes the passed interruptible, retrying if the operation is interrupted.
+ *
+ * @return true if the original operation was interrupted, otherwise false
+ */
+ public static boolean doUninterruptiblyGet(Interruptible interruptible) {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ interruptible.run();
+ break;
+ } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
+ interrupted = true;
+ }
+ }
+ return interrupted;
+ }
+
+ /**
+ * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an
+ * exception after being previously interrupted, the current thread will be re-interrupted.
+ *
+ * @return true if the original operation was interrupted, otherwise false
+ */
+ public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception {
+ boolean interrupted = false;
+ boolean success = false;
+ while (true) {
+ try {
+ interruptible.run();
+ success = true;
+ break;
+ } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
+ interrupted = true;
+ } finally {
+ if (!success && interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ return interrupted;
+ }
+
+ @FunctionalInterface
+ public interface Interruptible {
+ void run() throws InterruptedException;
+ }
+
+ @FunctionalInterface
+ public interface ThrowingInterruptible {
+ void run() throws Exception; // NOSONAR
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 081cf02..668eab1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -105,15 +105,15 @@
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
|| logRecord.getLogType() == LogType.WAIT) {
logRecord.isFlushed(false);
- syncCommitQ.offer(logRecord);
+ syncCommitQ.add(logRecord);
}
if (logRecord.getLogType() == LogType.FLUSH) {
logRecord.isFlushed(false);
- flushQ.offer(logRecord);
+ flushQ.add(logRecord);
}
} else if (logRecord.getLogSource() == LogSource.REMOTE
&& (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
- remoteJobsQ.offer(logRecord);
+ remoteJobsQ.add(logRecord);
}
this.notify();
}
@@ -168,29 +168,30 @@
////////////////////////////////////
@Override
- public void flush() {
+ public void flush(boolean stopping) {
+ boolean interrupted = false;
try {
int endOffset;
while (!full.get()) {
- synchronized (this) {
- if (appendOffset - flushOffset == 0 && !full.get()) {
- try {
+ try {
+ synchronized (this) {
+ if (appendOffset - flushOffset == 0 && !full.get()) {
if (IS_DEBUG_MODE) {
LOGGER.info("flush()| appendOffset: " + appendOffset + ", flushOffset: " + flushOffset
+ ", full: " + full.get());
}
- if (stop) {
+ if (stopping || stop) {
fileChannel.close();
return;
}
- this.wait();
- } catch (InterruptedException e) {
- continue;
+ wait();
}
+ endOffset = appendOffset;
}
- endOffset = appendOffset;
- }
internalFlush(flushOffset, endOffset);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
}
internalFlush(flushOffset, appendOffset);
if (isLastPage) {
@@ -198,6 +199,10 @@
}
} catch (IOException e) {
throw new IllegalStateException(e);
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -230,7 +235,7 @@
if (endOffset > beginOffset) {
logBufferTailReader.initializeScan(beginOffset, endOffset);
- ITransactionContext txnCtx = null;
+ ITransactionContext txnCtx;
LogRecord logRecord = logBufferTailReader.next();
while (logRecord != null) {
@@ -327,8 +332,9 @@
}
@Override
- public void stop() {
- this.stop = true;
+ public synchronized void stop() {
+ stop = true;
+ notifyAll();
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e5e91e8..5f9369d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -33,7 +33,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -51,6 +51,7 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.MutableLong;
import org.apache.asterix.common.transactions.TxnLogFile;
+import org.apache.asterix.common.utils.InterruptUtil;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
public class LogManager implements ILogManager, ILifeCycleComponent {
@@ -162,7 +163,7 @@
}
}
- /**
+ /*
* To eliminate the case where the modulo of the next appendLSN = 0 (the next
* appendLSN = the first LSN of the next log file), we do not allow a log to be
* written at the last offset of the current file.
@@ -616,13 +617,11 @@
* The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it.
*/
private class FlushLogsLogger extends Thread {
- private ILogRecord logRecord;
-
@Override
public void run() {
while (true) {
try {
- logRecord = flushLogsQ.take();
+ ILogRecord logRecord = flushLogsQ.take();
appendToLogTail(logRecord);
} catch (ACIDException e) {
e.printStackTrace();
@@ -641,77 +640,57 @@
private final LinkedBlockingQueue<ILogBuffer> emptyQ;
private final LinkedBlockingQueue<ILogBuffer> flushQ;
private final LinkedBlockingQueue<ILogBuffer> stashQ;
- private ILogBuffer flushPage;
- private final AtomicBoolean isStarted;
- private final AtomicBoolean terminateFlag;
+ private volatile ILogBuffer flushPage;
+ private volatile boolean stopping;
+ private final Semaphore started;
- public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
+ LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
LinkedBlockingQueue<ILogBuffer> stashQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
this.stashQ = stashQ;
- flushPage = null;
- isStarted = new AtomicBoolean(false);
- terminateFlag = new AtomicBoolean(false);
-
+ this.started = new Semaphore(0);
}
public void terminate() {
- //make sure the LogFlusher thread started before terminating it.
- synchronized (isStarted) {
- while (!isStarted.get()) {
- try {
- isStarted.wait();
- } catch (InterruptedException e) {
- //ignore
- }
- }
- }
+ // make sure the LogFlusher thread started before terminating it.
+ InterruptUtil.doUninterruptibly(started::acquire);
- terminateFlag.set(true);
- if (flushPage != null) {
- synchronized (flushPage) {
- flushPage.stop();
- flushPage.notify();
- }
+ stopping = true;
+
+ // we must tell any active flush, if any, to stop
+ final ILogBuffer currentFlushPage = this.flushPage;
+ if (currentFlushPage != null) {
+ currentFlushPage.stop();
}
- //[Notice]
- //The return value doesn't need to be checked
- //since terminateFlag will trigger termination if the flushQ is full.
- flushQ.offer(POISON_PILL);
+ // finally we put a POISON_PILL onto the flushQ to indicate to the flusher it is time to exit
+ InterruptUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL));
}
@Override
- public Boolean call() {
- synchronized (isStarted) {
- isStarted.set(true);
- isStarted.notify();
- }
+ public Boolean call() throws InterruptedException {
+ started.release();
+ boolean interrupted = false;
try {
while (true) {
flushPage = null;
- try {
- flushPage = flushQ.take();
- if (flushPage == POISON_PILL || terminateFlag.get()) {
- return true;
- }
- } catch (InterruptedException e) {
- if (flushPage == null) {
- continue;
- }
+ interrupted = InterruptUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted;
+ if (flushPage == POISON_PILL) {
+ return true;
}
- flushPage.flush();
- emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
+ flushPage.flush(stopping);
+
+ // TODO(mblow): recycle large pages
+ emptyQ.add(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
}
} catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("-------------------------------------------------------------------------");
- LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
- LOGGER.info("-------------------------------------------------------------------------");
- }
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "LogFlusher is terminating abnormally. System is in unusable state.", e);
throw e;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
}