[NO ISSUE][STO] Perform IO reads in uninterruptible threads
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Previously, IO reads are performed on the task threads.
- Task threads can be interrupted which can cause deadlocks
due to a JDK synchronization bug.
- After this change, 2 IO read threads are there per IO device.
- Threads will receive IO read requests and process them.
- Such threads are never interrupted and are killed through
the use of a poison pill.
Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2387
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 9936e1a..40422f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -16,18 +16,16 @@
! specific language governing permissions and limitations
! under the License.
!-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hyracks-storage-common</artifactId>
<name>hyracks-storage-common</name>
-
<parent>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks</artifactId>
<version>0.3.4-SNAPSHOT</version>
</parent>
-
<licenses>
<license>
<name>Apache License, Version 2.0</name>
@@ -36,7 +34,6 @@
<comments>A business-friendly OSS license</comments>
</license>
</licenses>
-
<properties>
<root.dir>${basedir}/../..</root.dir>
</properties>
@@ -52,8 +49,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
index f704921..00c5dce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
@@ -26,14 +26,20 @@
/**
* Represents an index cursor. The expected use
* cursor = new cursor();
- * while (more predicates){
- * -cursor.open(predicate);
- * -while (cursor.hasNext()){
- * --cursor.next()
+ * try{
+ * -while (more predicates){
+ * --cursor.open(predicate);
+ * --try{
+ * ---while (cursor.hasNext()){
+ * ----cursor.next()
+ * ---}
+ * --} finally{
+ * ---cursor.close();
+ * --}
* -}
- * -cursor.close();
+ * } finally{
+ * -cursor.destroy();
* }
- * cursor.destroy();
* Each created cursor must have destroy called
* Each successfully opened cursor must have close called
*
@@ -47,7 +53,8 @@
* When a cursor object is created, it is in the CLOSED state.
* CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED, close() --> no effect
* OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED.
- * DESTROYED: All calls are illegal.
+ * DESTROYED: The only legal call is destroy() which has no effect.
+ *
* Cursors must enforce the cursor state machine
*/
public interface IIndexCursor extends IDestroyable {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1443bbc..73969db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,11 +28,14 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,8 +48,10 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.InvokeUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,6 +60,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAP_FACTOR = 3;
+ private static final CachedPage POISON_PILL = new CachedPage();
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
@@ -66,7 +72,8 @@
private final int pageSize;
private final int maxOpenFiles;
- final IIOManager ioManager;
+ private final ExecutorService executor;
+ private final IIOManager ioManager;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -75,6 +82,7 @@
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AsyncFIFOPageQueueManager fifoWriter;
private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
+ private final BlockingQueue<CachedPage> readRequests;
//DEBUG
private Level fileOpsLevel = Level.DEBUG;
@@ -103,19 +111,43 @@
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
-
- Executor executor = Executors.newCachedThreadPool(threadFactory);
- fileInfoMap = new HashMap<>();
- cleanerThread = new CleanerThread();
- executor.execute(cleanerThread);
- closed = false;
-
- fifoWriter = new AsyncFIFOPageQueueManager(this);
- if (DEBUG) {
- confiscatedPages = new ArrayList<>();
- confiscatedPagesOwner = new HashMap<>();
- confiscateLock = new ReentrantLock();
- pinnedPageOwner = new ConcurrentHashMap<>();
+ int numReaders = ioManager.getIODevices().size() * 2;
+ readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
+ executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
+ try {
+ fileInfoMap = new HashMap<>();
+ cleanerThread = new CleanerThread();
+ executor.execute(cleanerThread);
+ for (int i = 0; i < numReaders; i++) {
+ executor.execute(new ReaderThread(i));
+ }
+ closed = false;
+ fifoWriter = new AsyncFIFOPageQueueManager(this);
+ if (DEBUG) {
+ confiscatedPages = new ArrayList<>();
+ confiscatedPagesOwner = new HashMap<>();
+ confiscateLock = new ReentrantLock();
+ pinnedPageOwner = new ConcurrentHashMap<>();
+ }
+ } catch (Throwable th) {
+ try {
+ throw th;
+ } finally {
+ readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
+ Thread.currentThread().interrupt();
+ th.addSuppressed(e);
+ } catch (Throwable e) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e);
+ th.addSuppressed(e);
+ }
+ }
}
}
@@ -185,22 +217,27 @@
// Resolve race of multiple threads trying to read the page from
// disk.
synchronized (cPage) {
- if (!cPage.valid) {
+ if (cPage.state != State.VALID) {
try {
- tryRead(cPage);
- cPage.valid = true;
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e);
- throw e;
- } finally {
- if (!cPage.valid) {
- unpin(cPage);
+ // Will attempt to re-read even if previous read failed
+ if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) {
+ // submit request to read
+ cPage.state = State.READ_REQUESTED;
+ readRequests.put(cPage);
}
+ cPage.awaitRead();
+ } catch (InterruptedException e) {
+ cPage.state = State.INVALID;
+ unpin(cPage);
+ throw HyracksDataException.create(e);
+ } catch (Throwable th) {
+ unpin(cPage);
+ throw HyracksDataException.create(th);
}
}
}
} else {
- cPage.valid = true;
+ cPage.state = State.VALID;
}
pageReplacementStrategy.notifyCachePageAccess(cPage);
if (DEBUG) {
@@ -449,7 +486,7 @@
buffer.append(" ").append(cp.cpid).append(" -> [")
.append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
.append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
- .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+ .append(", ").append(cp.state).append(", ")
.append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
.append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
cp = cp.next;
@@ -480,7 +517,7 @@
if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
return false;
}
- if (c.valid) {
+ if (c.state == State.VALID) {
reachableDpids.add(c.dpid);
}
}
@@ -519,6 +556,9 @@
read(cPage);
return;
} catch (HyracksDataException readException) {
+ if (Thread.interrupted()) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+ }
if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) {
/**
* if the read failure was due to another thread closing the file channel because
@@ -530,8 +570,7 @@
LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1,
MAX_PAGE_READ_ATTEMPTS), readException);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
+ LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
}
} else {
throw readException;
@@ -670,6 +709,54 @@
}
}
+ private class ReaderThread implements Runnable {
+ private final int num;
+
+ private ReaderThread(int num) {
+ this.num = num;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("Buffer-Cache-Reader-" + num);
+ while (true) {
+ CachedPage next;
+ try {
+ next = readRequests.take();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+ break;
+ }
+ if (next == POISON_PILL) {
+ LOGGER.log(Level.INFO, "Exiting");
+ InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+ if (Thread.interrupted()) {
+ LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted.");
+ }
+ break;
+ }
+ if (next.state != State.READ_REQUESTED) {
+ LOGGER.log(Level.ERROR,
+ "Exiting BufferCache reader thread. Took a page with state = {} out of the queue",
+ next.state);
+ break;
+ }
+ try {
+ tryRead(next);
+ next.state = State.VALID;
+ } catch (HyracksDataException e) {
+ next.readFailure = e;
+ next.state = State.READ_FAILED;
+ LOGGER.log(Level.WARN, "Failed to read a page", e);
+ }
+ synchronized (next) {
+ next.notifyAll();
+ }
+ }
+ }
+
+ }
+
private class CleanerThread implements Runnable {
private volatile boolean shutdownStart = false;
private volatile boolean shutdownComplete = false;
@@ -799,6 +886,16 @@
});
fileInfoMap.clear();
}
+ InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
+ Thread.currentThread().interrupt();
+ }
}
@Override
@@ -1343,7 +1440,7 @@
}
try {
cPage.reset(cPage.dpid);
- cPage.valid = true;
+ cPage.state = State.VALID;
cPage.next = bucket.cachedPage;
bucket.cachedPage = cPage;
cPage.pinCount.decrementAndGet();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index bc0a04e..d7a55af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,19 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
/**
* @author yingyib
*/
public class CachedPage implements ICachedPageInternal {
+ public enum State {
+ INVALID,
+ READ_REQUESTED,
+ READ_FAILED,
+ VALID
+ }
+
final int cpid;
ByteBuffer buffer;
public final AtomicInteger pinCount;
@@ -36,7 +45,7 @@
private final IPageReplacementStrategy pageReplacementStrategy;
volatile long dpid; // disk page id (composed of file id and page id)
CachedPage next;
- volatile boolean valid;
+ volatile State state;
final AtomicBoolean confiscated;
private IQueueInfo queueInfo;
private int multiplier;
@@ -44,6 +53,7 @@
// DEBUG
private static final boolean DEBUG = false;
private final StackTraceElement[] ctorStack;
+ Throwable readFailure;
//Constructor for making dummy entry for FIFO queue
public CachedPage() {
@@ -72,7 +82,7 @@
latch = new ReentrantReadWriteLock(true);
replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
dpid = -1;
- valid = false;
+ state = State.INVALID;
confiscated = new AtomicBoolean(false);
queueInfo = null;
ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -81,7 +91,7 @@
public void reset(long dpid) {
this.dpid = dpid;
dirty.set(false);
- valid = false;
+ state = State.INVALID;
confiscated.set(false);
pageReplacementStrategy.notifyCachePageReset(this);
queueInfo = null;
@@ -205,4 +215,30 @@
public boolean isLargePage() {
return multiplier > 1;
}
+
+ /**
+ * Wait for the page requested to be read to complete the read operation
+ * This method is uninterrubtible
+ *
+ * @throws HyracksDataException
+ */
+ public synchronized void awaitRead() throws HyracksDataException {
+ boolean interrupted = false;
+ try {
+ while (state != State.VALID && state != State.READ_FAILED) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (state == State.READ_FAILED) {
+ throw HyracksDataException.create(readFailure);
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}