[NO ISSUE][STO] Move the IO threads from BufferCache to IOManager

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Move the IO threads from BufferCache to IOManager to cover all
IO uses that go through the IOManager.

Change-Id: Ic02b456826ae7abc2619a7eec3f90b48717b0adb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2417
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 4d26d25..6c4d068 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -429,7 +429,7 @@
 
     public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging)
             throws HyracksDataException {
-        IHyracksTaskContext ctx = TestUtils.create(KB32);
+        IHyracksTaskContext ctx = TestUtils.create(KB32, ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
         if (withMessaging) {
             TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 25c71df..306a2a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -32,9 +32,9 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -178,7 +178,7 @@
     }
 
     private Throwable finish(Throwable failure) {
-        Throwable th = ResourceReleaseUtils.close(recordReader, null);
+        Throwable th = CleanupUtils.close(recordReader, null);
         th = DataflowUtils.close(tupleForwarder, th);
         closeSignal();
         setState(State.STOPPED);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 471d23f..fb9a4a6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -25,7 +25,7 @@
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -103,11 +103,11 @@
 
     @Override
     public synchronized void close() throws IOException {
-        Throwable failure = ResourceReleaseUtils.close(connectionStream, null);
+        Throwable failure = CleanupUtils.close(connectionStream, null);
         connectionStream = null;
-        failure = ResourceReleaseUtils.close(socket, failure);
+        failure = CleanupUtils.close(socket, failure);
         socket = null;
-        failure = ResourceReleaseUtils.close(server, failure);
+        failure = CleanupUtils.close(server, failure);
         server = null;
         if (failure != null) {
             throw HyracksDataException.create(failure);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
similarity index 80%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
index 3de2ca1..3efb2e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
@@ -18,10 +18,9 @@
  */
 package org.apache.hyracks.api.io;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+@FunctionalInterface
+public interface IAsyncRequest {
 
-public interface IIOFuture {
-    public int synchronize() throws HyracksDataException, InterruptedException;
+    void await() throws InterruptedException;
 
-    public boolean isComplete();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index b0cc07a..ff1e47f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -18,13 +18,13 @@
  */
 package org.apache.hyracks.api.io;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public interface IIOManager {
+public interface IIOManager extends Closeable {
     public enum FileReadWriteMode {
         READ_ONLY,
         READ_WRITE
@@ -47,16 +47,16 @@
 
     public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
-    public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data);
+    IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
-    public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data);
+    IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
+
+    IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
     public void close(IFileHandle fHandle) throws HyracksDataException;
 
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
 
-    public void setExecutor(Executor executor);
-
     public long getSize(IFileHandle fileHandle);
 
     public void deleteWorkspaceFiles() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index a67c133..008cffd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -41,8 +41,8 @@
                     } catch (Throwable th) { // NOSONAR. Had to be done to satisfy contracts
                         try {
                             LOGGER.log(Level.WARN, "Failure destroying a destroyable resource", th);
-                        } catch (Throwable ignore) {
-                            // Do nothing
+                        } catch (Throwable ignore) { // NOSONAR: Ignore catching Throwable
+                            // NOSONAR Ignore logging failure
                         }
                         root = ExceptionUtils.suppress(root, th);
                     }
@@ -66,11 +66,11 @@
         if (writer != null) {
             try {
                 writer.close();
-            } catch (Throwable th) { // NOSONAR Will be re-thrown
+            } catch (Throwable th) { // NOSONAR Will be suppressed
                 try {
                     LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
-                } catch (Throwable loggingFailure) {
-                    // Do nothing
+                } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+                    // NOSONAR: Ignore logging failure
                 }
                 root = ExceptionUtils.suppress(root, th);
             }
@@ -90,13 +90,39 @@
     public static void fail(IFrameWriter writer, Throwable root) {
         try {
             writer.fail();
-        } catch (Throwable th) { // NOSONAR Will be re-thrown
+        } catch (Throwable th) { // NOSONAR Will be suppressed
             try {
                 LOGGER.log(Level.WARN, "Failure failing " + writer.getClass().getSimpleName(), th);
-            } catch (Throwable loggingFailure) {
-                // Do nothing
+            } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+                // NOSONAR ignore logging failure
             }
             root.addSuppressed(th);
         }
     }
+
+    /**
+     * Close the AutoCloseable and suppress any Throwable thrown by the close call.
+     * This method must NEVER throw any Throwable
+     *
+     * @param closable
+     *            the resource to close
+     * @param root
+     *            the first exception encountered during release of resources
+     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+     */
+    public static Throwable close(AutoCloseable closable, Throwable root) {
+        if (closable != null) {
+            try {
+                closable.close();
+            } catch (Throwable th) { // NOSONAR Will be suppressed
+                try {
+                    LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
+                } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+                    // NOSONAR ignore logging failure
+                }
+                root = ExceptionUtils.suppress(root, th); // NOSONAR
+            }
+        }
+        return root;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0ccef1d..0b7254d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -63,6 +63,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
@@ -218,21 +219,26 @@
         Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
         ioManager =
                 new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
-
-        workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
-        jobletMap = new ConcurrentHashMap<>();
-        deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
-        timer = new Timer(true);
-        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
-                new File(new File(NodeControllerService.class.getName()), id));
-        memoryMXBean = ManagementFactory.getMemoryMXBean();
-        gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
-        threadMXBean = ManagementFactory.getThreadMXBean();
-        runtimeMXBean = ManagementFactory.getRuntimeMXBean();
-        osMXBean = ManagementFactory.getOperatingSystemMXBean();
-        getNodeControllerInfosAcceptor = new MutableObject<>();
-        memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
-        ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
+        try {
+            workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
+            jobletMap = new ConcurrentHashMap<>();
+            deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
+            timer = new Timer(true);
+            serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
+                    new File(new File(NodeControllerService.class.getName()), id));
+            memoryMXBean = ManagementFactory.getMemoryMXBean();
+            gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+            threadMXBean = ManagementFactory.getThreadMXBean();
+            runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+            osMXBean = ManagementFactory.getOperatingSystemMXBean();
+            getNodeControllerInfosAcceptor = new MutableObject<>();
+            memoryManager =
+                    new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+            ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
+        } catch (Throwable th) { // NOSONAR will be re-thrown
+            CleanupUtils.close(ioManager, th);
+            throw th;
+        }
     }
 
     public IOManager getIoManager() {
@@ -271,7 +277,6 @@
     }
 
     private void init() throws Exception {
-        ioManager.setExecutor(executor);
         datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
                 ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
         datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
@@ -526,7 +531,7 @@
                 });
             }
             ipc.stop();
-
+            ioManager.close();
             LOGGER.log(Level.INFO, "Stopped NodeControllerService");
         } else {
             LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 2742aaa..a527e92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -30,43 +30,49 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOFuture;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IoRequest.State;
 import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class IOManager implements IIOManager {
     /*
      * Constants
      */
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable
     private static final String WORKSPACE_FILE_SUFFIX = ".waf";
     private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX);
     /*
      * Finals
      */
+    private final ExecutorService executor;
+    private final BlockingQueue<IoRequest> submittedRequests;
+    private final BlockingQueue<IoRequest> freeRequests;
     private final List<IODeviceHandle> ioDevices;
     private final List<IODeviceHandle> workspaces;
     /*
      * Mutables
      */
-    private Executor executor;
     private int workspaceIndex;
     private IFileDeviceResolver deviceComputer;
 
-    public IOManager(List<IODeviceHandle> devices, Executor executor, IFileDeviceResolver deviceComputer)
-            throws HyracksDataException {
-        this(devices, deviceComputer);
-        this.executor = executor;
-    }
-
     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
         this.ioDevices = Collections.unmodifiableList(devices);
         checkDeviceValidity(devices);
@@ -86,6 +92,21 @@
         }
         workspaceIndex = 0;
         this.deviceComputer = deviceComputer;
+        submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+        freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+        int numIoThreads = ioDevices.size() * 2;
+        executor = Executors.newFixedThreadPool(numIoThreads);
+        for (int i = 0; i < numIoThreads; i++) {
+            executor.execute(new IoRequestHandler(i, submittedRequests));
+        }
+    }
+
+    public IoRequest getOrAllocRequest() {
+        IoRequest request = freeRequests.poll();
+        if (request == null) {
+            request = new IoRequest(this, submittedRequests, freeRequests);
+        }
+        return request;
     }
 
     private void checkDeviceValidity(List<IODeviceHandle> devices) throws HyracksDataException {
@@ -106,11 +127,6 @@
     }
 
     @Override
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
-    }
-
-    @Override
     public List<IODeviceHandle> getIODevices() {
         return ioDevices;
     }
@@ -129,6 +145,39 @@
 
     @Override
     public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        IoRequest req = asyncWrite(fHandle, offset, data);
+        InvokeUtil.doUninterruptibly(req);
+        try {
+            if (req.getState() == State.OPERATION_SUCCEEDED) {
+                return req.getWrite();
+            } else if (req.getState() == State.OPERATION_FAILED) {
+                throw req.getFailure();
+            } else {
+                throw new IllegalStateException("Write request completed with state " + req.getState());
+            }
+        } finally {
+            req.recycle();
+        }
+    }
+
+    @Override
+    public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+        IoRequest req = asyncWrite(fHandle, offset, dataArray);
+        InvokeUtil.doUninterruptibly(req);
+        try {
+            if (req.getState() == State.OPERATION_SUCCEEDED) {
+                return req.getWrites();
+            } else if (req.getState() == State.OPERATION_FAILED) {
+                throw req.getFailure();
+            } else {
+                throw new IllegalStateException("Write request completed with state " + req.getState());
+            }
+        } finally {
+            req.recycle();
+        }
+    }
+
+    public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         try {
             if (fHandle == null) {
                 throw new IllegalStateException("Trying to write to a deleted file.");
@@ -152,8 +201,7 @@
         }
     }
 
-    @Override
-    public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+    public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
         try {
             if (fHandle == null) {
                 throw new IllegalStateException("Trying to write to a deleted file.");
@@ -197,6 +245,22 @@
      */
     @Override
     public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        IoRequest req = asyncRead(fHandle, offset, data);
+        InvokeUtil.doUninterruptibly(req);
+        try {
+            if (req.getState() == State.OPERATION_SUCCEEDED) {
+                return req.getRead();
+            } else if (req.getState() == State.OPERATION_FAILED) {
+                throw req.getFailure();
+            } else {
+                throw new IllegalStateException("Reqd request completed with state " + req.getState());
+            }
+        } finally {
+            req.recycle();
+        }
+    }
+
+    public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         try {
             int n = 0;
             int remaining = data.remaining();
@@ -223,16 +287,38 @@
     }
 
     @Override
-    public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) {
-        AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data);
-        executor.execute(req);
+    public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+        IoRequest req = getOrAllocRequest();
+        try {
+            req.write(fHandle, offset, dataArray);
+        } catch (HyracksDataException e) {
+            req.recycle();
+            throw e;
+        }
         return req;
     }
 
     @Override
-    public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) {
-        AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data);
-        executor.execute(req);
+    public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        IoRequest req = getOrAllocRequest();
+        try {
+            req.write(fHandle, offset, data);
+        } catch (HyracksDataException e) {
+            req.recycle();
+            throw e;
+        }
+        return req;
+    }
+
+    @Override
+    public IoRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        IoRequest req = getOrAllocRequest();
+        try {
+            req.read(fHandle, offset, data);
+        } catch (HyracksDataException e) {
+            req.recycle();
+            throw e;
+        }
         return req;
     }
 
@@ -259,80 +345,6 @@
         return dev.createFileRef(waPath + File.separator + waf.getName());
     }
 
-    private abstract class AsyncRequest implements IIOFuture, Runnable {
-        protected final FileHandle fHandle;
-        protected final long offset;
-        protected final ByteBuffer data;
-        private boolean complete;
-        private HyracksDataException exception;
-        private int result;
-
-        private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) {
-            this.fHandle = fHandle;
-            this.offset = offset;
-            this.data = data;
-            complete = false;
-            exception = null;
-        }
-
-        @Override
-        public void run() {
-            HyracksDataException hde = null;
-            int res = -1;
-            try {
-                res = performOperation();
-            } catch (HyracksDataException e) {
-                hde = e;
-            }
-            synchronized (this) {
-                exception = hde;
-                result = res;
-                complete = true;
-                notifyAll();
-            }
-        }
-
-        protected abstract int performOperation() throws HyracksDataException;
-
-        @Override
-        public synchronized int synchronize() throws HyracksDataException, InterruptedException {
-            while (!complete) {
-                wait();
-            }
-            if (exception != null) {
-                throw exception;
-            }
-            return result;
-        }
-
-        @Override
-        public synchronized boolean isComplete() {
-            return complete;
-        }
-    }
-
-    private class AsyncReadRequest extends AsyncRequest {
-        private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) {
-            super(fHandle, offset, data);
-        }
-
-        @Override
-        protected int performOperation() throws HyracksDataException {
-            return syncRead(fHandle, offset, data);
-        }
-    }
-
-    private class AsyncWriteRequest extends AsyncRequest {
-        private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) {
-            super(fHandle, offset, data);
-        }
-
-        @Override
-        protected int performOperation() throws HyracksDataException {
-            return syncWrite(fHandle, offset, data);
-        }
-    }
-
     @Override
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
         try {
@@ -390,4 +402,18 @@
         }
         return null;
     }
+
+    @Override
+    public void close() throws IOException {
+        InvokeUtil.doUninterruptibly(() -> submittedRequests.put(IoRequestHandler.POISON_PILL));
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                LOGGER.log(Level.WARN, "Failure shutting down {} executor service", getClass().getSimpleName());
+            }
+        } catch (InterruptedException e) {
+            LOGGER.log(Level.WARN, "Interrupted while shutting down {} executor service", getClass().getSimpleName());
+            Thread.currentThread().interrupt();
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
new file mode 100644
index 0000000..4235d19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IAsyncRequest;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction;
+
+public class IoRequest implements IAsyncRequest, InterruptibleAction {
+
+    public enum State {
+        INITIAL,
+        READ_REQUESTED,
+        WRITE_REQUESTED,
+        OPERATION_FAILED,
+        OPERATION_SUCCEEDED
+    }
+
+    private final IOManager ioManager;
+    private final BlockingQueue<IoRequest> submittedRequests;
+    private final BlockingQueue<IoRequest> freeRequests;
+    private State state;
+    private IFileHandle fHandle;
+    private long offset;
+    private ByteBuffer data;
+    private ByteBuffer[] dataArray;
+    private HyracksDataException failure;
+    private int read;
+    private int write;
+    private long writes;
+
+    public IoRequest(IOManager ioManager, BlockingQueue<IoRequest> submittedRequests,
+            BlockingQueue<IoRequest> freeRequests) {
+        this.ioManager = ioManager;
+        this.submittedRequests = submittedRequests;
+        this.freeRequests = freeRequests;
+        reset();
+    }
+
+    public void reset() {
+        state = State.INITIAL;
+        fHandle = null;
+        data = null;
+        dataArray = null;
+        failure = null;
+    }
+
+    public void read(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        if (state != State.INITIAL) {
+            throw new IllegalStateException("Can't request a read operation through a " + state + " request");
+        }
+        state = State.READ_REQUESTED;
+        this.fHandle = fHandle;
+        this.offset = offset;
+        this.data = data;
+        queue();
+    }
+
+    public void write(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+        if (state != State.INITIAL) {
+            throw new IllegalStateException("Can't request a write operation through a " + state + " request");
+        }
+        state = State.WRITE_REQUESTED;
+        this.fHandle = fHandle;
+        this.offset = offset;
+        this.dataArray = dataArray;
+        queue();
+    }
+
+    public void write(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        if (state != State.INITIAL) {
+            throw new IllegalStateException("Can't request a write operation through a " + state + " request");
+        }
+        state = State.WRITE_REQUESTED;
+        this.fHandle = fHandle;
+        this.offset = offset;
+        this.data = data;
+        queue();
+    }
+
+    private void queue() throws HyracksDataException {
+        try {
+            submittedRequests.put(this);
+        } catch (InterruptedException e) { // NOSONAR: The call below will re-interrupt
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void await() throws InterruptedException {
+        synchronized (this) {
+            while (state != State.OPERATION_FAILED && state != State.OPERATION_SUCCEEDED) {
+                wait();
+            }
+        }
+    }
+
+    synchronized void handle() {
+        try {
+            if (state == State.READ_REQUESTED) {
+                read = ioManager.doSyncRead(fHandle, offset, data);
+            } else if (state == State.WRITE_REQUESTED) {
+                if (data != null) {
+                    // single buffer
+                    write = ioManager.doSyncWrite(fHandle, offset, data);
+                } else {
+                    // multiple buffers
+                    writes = ioManager.doSyncWrite(fHandle, offset, dataArray);
+                }
+            } else {
+                throw new IllegalStateException("IO Request with state = " + state);
+            }
+            state = State.OPERATION_SUCCEEDED;
+        } catch (Throwable th) { // NOSONAR: This method must never throw anything
+            state = State.OPERATION_FAILED;
+            failure = HyracksDataException.create(th);
+        }
+        notifyAll();
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    void recycle() {
+        reset();
+        freeRequests.offer(this);
+    }
+
+    public int getRead() {
+        return read;
+    }
+
+    public int getWrite() {
+        return write;
+    }
+
+    public long getWrites() {
+        return writes;
+    }
+
+    @Override
+    public void run() throws InterruptedException {
+        await();
+    }
+
+    public HyracksDataException getFailure() {
+        return failure;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java
new file mode 100644
index 0000000..84fd24b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IoRequestHandler implements Runnable {
+    private static final Logger LOGGER = LogManager.getLogger();
+    public static final IoRequest POISON_PILL = new IoRequest(null, null, null);
+    private final int num;
+    private final BlockingQueue<IoRequest> queue;
+
+    public IoRequestHandler(int num, BlockingQueue<IoRequest> queue) {
+        this.num = num;
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName(getClass().getSimpleName() + "-" + num);
+        while (true) { // NOSONAR: Suppress 1 continue and 1 break
+            IoRequest next;
+            try {
+                next = queue.take();
+            } catch (InterruptedException e) { // NOSONAR: This is not supposed to be ever interrupted
+                LOGGER.log(Level.WARN, "Ignoring interrupt. IO threads should never be interrupted.");
+                continue;
+            }
+            if (next == POISON_PILL) {
+                LOGGER.log(Level.INFO, "Exiting");
+                InvokeUtil.doUninterruptibly(() -> queue.put(POISON_PILL));
+                if (Thread.interrupted()) {
+                    LOGGER.log(Level.ERROR, "Ignoring interrupt. IO threads should never be interrupted.");
+                }
+                break;
+            }
+            next.handle();
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
index b597b60..7861df0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
@@ -61,32 +61,6 @@
     }
 
     /**
-     * Close the AutoCloseable and suppress any Throwable thrown by the close call.
-     * This method must NEVER throw any Throwable
-     *
-     * @param closable
-     *            the resource to close
-     * @param root
-     *            the first exception encountered during release of resources
-     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
-     */
-    public static Throwable close(AutoCloseable closable, Throwable root) {
-        if (closable != null) {
-            try {
-                closable.close();
-            } catch (Throwable th) { // NOSONAR Will be re-thrown
-                try {
-                    LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
-                } catch (Throwable loggingFailure) {
-                    // Do nothing
-                }
-                root = ExceptionUtils.suppress(root, th);
-            }
-        }
-        return root;
-    }
-
-    /**
      * Close the IIndexDataflowHelper and suppress any Throwable thrown by the close call.
      * This method must NEVER throw any Throwable
      *
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 7f915c6..1443bbc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,14 +28,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,9 +44,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.IoUtil;
-import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.logging.log4j.Level;
@@ -60,7 +55,6 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final int MAP_FACTOR = 3;
-    private static final CachedPage POISON_PILL = new CachedPage();
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
@@ -72,8 +66,7 @@
 
     private final int pageSize;
     private final int maxOpenFiles;
-    private final ExecutorService executor;
-    private final IIOManager ioManager;
+    final IIOManager ioManager;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -82,7 +75,6 @@
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AsyncFIFOPageQueueManager fifoWriter;
     private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
-    private final BlockingQueue<CachedPage> readRequests;
 
     //DEBUG
     private Level fileOpsLevel = Level.DEBUG;
@@ -111,43 +103,19 @@
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
-        int numReaders = ioManager.getIODevices().size() * 2;
-        readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
-        executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
-        try {
-            fileInfoMap = new HashMap<>();
-            cleanerThread = new CleanerThread();
-            executor.execute(cleanerThread);
-            for (int i = 0; i < numReaders; i++) {
-                executor.execute(new ReaderThread(i));
-            }
-            closed = false;
-            fifoWriter = new AsyncFIFOPageQueueManager(this);
-            if (DEBUG) {
-                confiscatedPages = new ArrayList<>();
-                confiscatedPagesOwner = new HashMap<>();
-                confiscateLock = new ReentrantLock();
-                pinnedPageOwner = new ConcurrentHashMap<>();
-            }
-        } catch (Throwable th) {
-            try {
-                throw th;
-            } finally {
-                readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty
-                executor.shutdown();
-                try {
-                    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
-                        LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
-                    }
-                } catch (InterruptedException e) {
-                    LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
-                    Thread.currentThread().interrupt();
-                    th.addSuppressed(e);
-                } catch (Throwable e) {
-                    LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e);
-                    th.addSuppressed(e);
-                }
-            }
+
+        Executor executor = Executors.newCachedThreadPool(threadFactory);
+        fileInfoMap = new HashMap<>();
+        cleanerThread = new CleanerThread();
+        executor.execute(cleanerThread);
+        closed = false;
+
+        fifoWriter = new AsyncFIFOPageQueueManager(this);
+        if (DEBUG) {
+            confiscatedPages = new ArrayList<>();
+            confiscatedPagesOwner = new HashMap<>();
+            confiscateLock = new ReentrantLock();
+            pinnedPageOwner = new ConcurrentHashMap<>();
         }
     }
 
@@ -201,48 +169,38 @@
             pinSanityCheck(dpid);
         }
         CachedPage cPage = findPage(dpid);
-        if (cPage.state != State.VALID) {
-            synchronized (cPage) {
-                if (!newPage) {
-                    if (DEBUG) {
-                        confiscateLock.lock();
-                        try {
-                            for (CachedPage c : confiscatedPages) {
-                                if (c.dpid == dpid && c.confiscated.get()) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                        } finally {
-                            confiscateLock.unlock();
+        if (!newPage) {
+            if (DEBUG) {
+                confiscateLock.lock();
+                try {
+                    for (CachedPage c : confiscatedPages) {
+                        if (c.dpid == dpid && c.confiscated.get()) {
+                            throw new IllegalStateException();
                         }
                     }
-                    // Resolve race of multiple threads trying to read the page from
-                    // disk.
-
-                    if (cPage.state != State.VALID) {
-                        try {
-                            // Will attempt to re-read even if previous read failed
-                            if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) {
-                                // submit request to read
-                                cPage.state = State.READ_REQUESTED;
-                                readRequests.put(cPage);
-                            }
-                            cPage.awaitRead();
-                        } catch (InterruptedException e) {
-                            cPage.state = State.INVALID;
-                            unpin(cPage);
-                            throw HyracksDataException.create(e);
-                        } catch (Throwable th) {
-                            unpin(cPage);
-                            throw HyracksDataException.create(th);
-                        }
-                    }
-
-                } else {
-                    cPage.state = State.VALID;
-                    cPage.notifyAll();
+                } finally {
+                    confiscateLock.unlock();
                 }
             }
+            // Resolve race of multiple threads trying to read the page from
+            // disk.
+            synchronized (cPage) {
+                if (!cPage.valid) {
+                    try {
+                        tryRead(cPage);
+                        cPage.valid = true;
+                    } catch (Exception e) {
+                        LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e);
+                        throw e;
+                    } finally {
+                        if (!cPage.valid) {
+                            unpin(cPage);
+                        }
+                    }
+                }
+            }
+        } else {
+            cPage.valid = true;
         }
         pageReplacementStrategy.notifyCachePageAccess(cPage);
         if (DEBUG) {
@@ -491,7 +449,7 @@
                     buffer.append("      ").append(cp.cpid).append(" -> [")
                             .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
                             .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
-                            .append(", ").append(cp.state).append(", ")
+                            .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
                             .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
                             .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
                     cp = cp.next;
@@ -522,7 +480,7 @@
                 if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
                     return false;
                 }
-                if (c.state == State.VALID) {
+                if (c.valid) {
                     reachableDpids.add(c.dpid);
                 }
             }
@@ -561,9 +519,6 @@
                 read(cPage);
                 return;
             } catch (HyracksDataException readException) {
-                if (Thread.interrupted()) {
-                    LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
-                }
                 if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) {
                     /**
                      * if the read failure was due to another thread closing the file channel because
@@ -575,7 +530,8 @@
                         LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1,
                                 MAX_PAGE_READ_ATTEMPTS), readException);
                     } catch (InterruptedException e) {
-                        LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
                     }
                 } else {
                     throw readException;
@@ -714,56 +670,6 @@
         }
     }
 
-    private class ReaderThread implements Runnable {
-        private final int num;
-
-        private ReaderThread(int num) {
-            this.num = num;
-        }
-
-        @Override
-        public void run() {
-            Thread.currentThread().setName("Buffer-Cache-Reader-" + num);
-            while (true) {
-                CachedPage next;
-                try {
-                    next = readRequests.take();
-                } catch (InterruptedException e) {
-                    LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
-                    break;
-                }
-                if (next == POISON_PILL) {
-                    LOGGER.log(Level.INFO, "Exiting");
-                    InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
-                    if (Thread.interrupted()) {
-                        LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted.");
-                    }
-                    break;
-                }
-                synchronized (next) {
-                    if (next.state != State.VALID) {
-                        if (next.state != State.READ_REQUESTED) {
-                            LOGGER.log(Level.ERROR,
-                                    "Exiting BufferCache reader thread. Took a page with state = {} out of the queue",
-                                    next.state);
-                            break;
-                        }
-                        try {
-                            tryRead(next);
-                            next.state = State.VALID;
-                        } catch (HyracksDataException e) {
-                            next.readFailure = e;
-                            next.state = State.READ_FAILED;
-                            LOGGER.log(Level.WARN, "Failed to read a page", e);
-                        }
-                        next.notifyAll();
-                    }
-                }
-            }
-        }
-
-    }
-
     private class CleanerThread implements Runnable {
         private volatile boolean shutdownStart = false;
         private volatile boolean shutdownComplete = false;
@@ -893,16 +799,6 @@
             });
             fileInfoMap.clear();
         }
-        InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
-        executor.shutdown();
-        try {
-            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
-                LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
-            }
-        } catch (InterruptedException e) {
-            LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
-            Thread.currentThread().interrupt();
-        }
     }
 
     @Override
@@ -1447,7 +1343,7 @@
             }
             try {
                 cPage.reset(cPage.dpid);
-                cPage.state = State.VALID;
+                cPage.valid = true;
                 cPage.next = bucket.cachedPage;
                 bucket.cachedPage = cPage;
                 cPage.pinCount.decrementAndGet();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index d7a55af..02eb8bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,19 +23,10 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 /**
  * @author yingyib
  */
 public class CachedPage implements ICachedPageInternal {
-    public enum State {
-        INVALID,
-        READ_REQUESTED,
-        READ_FAILED,
-        VALID
-    }
-
     final int cpid;
     ByteBuffer buffer;
     public final AtomicInteger pinCount;
@@ -45,7 +36,7 @@
     private final IPageReplacementStrategy pageReplacementStrategy;
     volatile long dpid; // disk page id (composed of file id and page id)
     CachedPage next;
-    volatile State state;
+    volatile boolean valid;
     final AtomicBoolean confiscated;
     private IQueueInfo queueInfo;
     private int multiplier;
@@ -53,7 +44,6 @@
     // DEBUG
     private static final boolean DEBUG = false;
     private final StackTraceElement[] ctorStack;
-    Throwable readFailure;
 
     //Constructor for making dummy entry for FIFO queue
     public CachedPage() {
@@ -82,7 +72,7 @@
         latch = new ReentrantReadWriteLock(true);
         replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
         dpid = -1;
-        state = State.INVALID;
+        valid = false;
         confiscated = new AtomicBoolean(false);
         queueInfo = null;
         ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -91,7 +81,7 @@
     public void reset(long dpid) {
         this.dpid = dpid;
         dirty.set(false);
-        state = State.INVALID;
+        valid = false;
         confiscated.set(false);
         pageReplacementStrategy.notifyCachePageReset(this);
         queueInfo = null;
@@ -215,30 +205,4 @@
     public boolean isLargePage() {
         return multiplier > 1;
     }
-
-    /**
-     * Wait for the page requested to be read to complete the read operation
-     * This method is uninterrubtible
-     *
-     * @throws HyracksDataException
-     */
-    public synchronized void awaitRead() throws HyracksDataException {
-        boolean interrupted = false;
-        try {
-            while (state != State.VALID && state != State.READ_FAILED) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    interrupted = true;
-                }
-            }
-            if (state == State.READ_FAILED) {
-                throw HyracksDataException.create(readFailure);
-            }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 3060b25..673a27f 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -21,7 +21,6 @@
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -110,7 +109,7 @@
             List<IODeviceHandle> devices = new ArrayList<>();
             devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"),
                     "iodev_test_wa"));
-            ioManager = new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+            ioManager = new IOManager(devices, new DefaultDeviceResolver());
         }
         return ioManager;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index d7578af..ebfaeb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -26,7 +26,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executors;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -40,6 +39,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.core.Appender;
@@ -49,8 +49,20 @@
 
 public class TestUtils {
     public static IHyracksTaskContext create(int frameSize) {
+        IOManager ioManager = null;
         try {
-            IOManager ioManager = createIoManager();
+            ioManager = createIoManager();
+            return create(frameSize, ioManager);
+        } catch (Exception e) {
+            if (ioManager != null) {
+                CleanupUtils.close(ioManager, e);
+            }
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static IHyracksTaskContext create(int frameSize, IOManager ioManager) {
+        try {
             INCServiceContext serviceCtx = new TestNCServiceContext(ioManager, null);
             TestJobletContext jobletCtx = new TestJobletContext(frameSize, serviceCtx, new JobId(0));
             TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
@@ -64,7 +76,7 @@
     private static IOManager createIoManager() throws HyracksException {
         List<IODeviceHandle> devices = new ArrayList<>();
         devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
-        return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver());
     }
 
     public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
index 2243ee3..ea22355 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
@@ -73,85 +73,66 @@
 
     @Test
     public void test01() throws Exception {
-
         TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
         IBufferCache bufferCache = harness.getBufferCache();
-
         // declare fields
         int fieldCount = 2;
         ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
         typeTraits[0] = IntegerPointable.TYPE_TRAITS;
         typeTraits[1] = IntegerPointable.TYPE_TRAITS;
-
         // declare keys
         int keyFieldCount = 1;
         IBinaryComparatorFactory[] cmpFactories = new IBinaryComparatorFactory[keyFieldCount];
         cmpFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
-
         BTreeTypeAwareTupleWriterFactory tupleWriterFactory = new BTreeTypeAwareTupleWriterFactory(typeTraits, false);
         ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(tupleWriterFactory);
         ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory);
         ITreeIndexMetadataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
-
         IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) leafFrameFactory.createFrame();
         IBTreeInteriorFrame interiorFrame = (IBTreeInteriorFrame) interiorFrameFactory.createFrame();
         ITreeIndexMetadataFrame metaFrame = metaFrameFactory.createFrame();
-
         IMetadataPageManager freePageManager = new LinkedMetaDataPageManager(bufferCache, metaFrameFactory);
-
         BTree btree = new BTree(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories,
                 fieldCount, harness.getFileReference());
         btree.create();
         btree.activate();
-
         Random rnd = new Random();
         rnd.setSeed(50);
-
         long start = System.currentTimeMillis();
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("INSERTING INTO TREE");
         }
-
         IFrame frame = new VSizeFrame(ctx);
         FrameTupleAppender appender = new FrameTupleAppender();
         ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
         DataOutput dos = tb.getDataOutput();
-
         ISerializerDeserializer[] recDescSers =
                 { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
         RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
         IFrameTupleAccessor accessor = new FrameTupleAccessor(recDesc);
         accessor.reset(frame.getBuffer());
         FrameTupleReference tuple = new FrameTupleReference();
-
         IndexAccessParameters actx =
                 new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE);
         ITreeIndexAccessor indexAccessor = btree.createAccessor(actx);
         // 10000
         for (int i = 0; i < 100000; i++) {
-
             int f0 = rnd.nextInt() % 100000;
             int f1 = 5;
-
             tb.reset();
             IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
             tb.addFieldEndOffset();
             IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
             tb.addFieldEndOffset();
-
             appender.reset(frame, true);
             appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-
             tuple.reset(accessor, 0);
-
             if (LOGGER.isInfoEnabled()) {
                 if (i % 10000 == 0) {
                     long end = System.currentTimeMillis();
                     LOGGER.info("INSERTING " + i + " : " + f0 + " " + f1 + " " + (end - start));
                 }
             }
-
             try {
                 indexAccessor.insert(tuple);
             } catch (HyracksDataException e) {
@@ -161,18 +142,15 @@
                 }
             }
         }
-
         TreeIndexStatsGatherer statsGatherer = new TreeIndexStatsGatherer(bufferCache, freePageManager,
                 harness.getFileReference(), btree.getRootPageId());
         TreeIndexStats stats = statsGatherer.gatherStats(leafFrame, interiorFrame, metaFrame);
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("\n" + stats.toString());
         }
-
         TreeIndexBufferCacheWarmup bufferCacheWarmup =
                 new TreeIndexBufferCacheWarmup(bufferCache, freePageManager, harness.getFileReference());
         bufferCacheWarmup.warmup(leafFrame, metaFrame, new int[] { 1, 2 }, new int[] { 2, 5 });
-
         btree.deactivate();
         btree.destroy();
         bufferCache.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
index 56618e7..8c1124f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
@@ -31,7 +31,6 @@
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Executors;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -247,7 +246,7 @@
             String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i;
             devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
         }
-        return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver());
     }
 
     private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)