[NO ISSUE] Report all BufferCache write failures.

- user model changes: no
- storage format changes: no
- interface changes: yes
  + IPageWriteFailureCallback: used to notify async
    IO caller when something goes wrong.

Details:
- Before this change, it is possible for failures to
  be lost and for bulkload operations to not be
  aware of failure to write some pages. This can be
  dangerous.
- To avoid this, when sending a page to be written
  a PageWriteFailureCallback is associated with the
  page to notify the caller that a failure took place.

Change-Id: I97fd3dccff85dab84d644359be6f66b15ee708ef
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2787
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cluo8@uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 6c16bd1..2dce1cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public class BloomFilter {
@@ -275,7 +276,7 @@
         return new BloomFilterBuilder(numElements, numHashes, numBitsPerElement);
     }
 
-    public class BloomFilterBuilder implements IIndexBulkLoader {
+    public class BloomFilterBuilder extends PageWriteFailureCallback implements IIndexBulkLoader {
         private final long[] hashes = BloomFilter.createHashArray();
         private final long estimatedNumElements;
         private final int numHashes;
@@ -286,6 +287,7 @@
         private final ICachedPage[] pages;
         private ICachedPage metaDataPage = null;
 
+        @SuppressWarnings("squid:S1181") // Catch Throwable Must return all confiscated pages
         public BloomFilterBuilder(long estimatedNumElemenets, int numHashes, int numBitsPerElement)
                 throws HyracksDataException {
             if (!isActivated) {
@@ -303,11 +305,22 @@
             actualNumElements = 0;
             pages = new ICachedPage[numPages];
             int currentPageId = 1;
-            while (currentPageId <= numPages) {
-                ICachedPage page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
-                initPage(page.getBuffer().array());
-                pages[currentPageId - 1] = page;
-                ++currentPageId;
+            try {
+                while (currentPageId <= numPages) {
+                    ICachedPage page =
+                            bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
+                    initPage(page.getBuffer().array());
+                    pages[currentPageId - 1] = page;
+                    ++currentPageId;
+                }
+            } catch (Throwable th) {
+                // return confiscated pages
+                for (int i = 0; i < currentPageId; i++) {
+                    if (pages[i] != null) {
+                        bufferCache.returnPage(pages[i]);
+                    }
+                }
+                throw th;
             }
         }
 
@@ -364,11 +377,14 @@
         @Override
         public void end() throws HyracksDataException {
             allocateAndInitMetaDataPage();
-            queue.put(metaDataPage);
+            queue.put(metaDataPage, this);
             for (ICachedPage p : pages) {
-                queue.put(p);
+                queue.put(p, this);
             }
             bufferCache.finishQueue();
+            if (hasFailed()) {
+                throw HyracksDataException.create(getFailure());
+            }
             BloomFilter.this.numBits = numBits;
             BloomFilter.this.numHashes = numHashes;
             BloomFilter.this.numElements = actualNumElements;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index fb8770e..4fc8af9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1012,9 +1012,7 @@
             try {
                 int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
                         interiorFrame.getBytesRequiredToWriteTuple(tuple));
-
                 NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
                 int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
                 int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
 
@@ -1045,12 +1043,11 @@
 
                         ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
 
-                        queue.put(leafFrontier.page);
+                        queue.put(leafFrontier.page, this);
                         for (ICachedPage c : pagesToWrite) {
-                            queue.put(c);
+                            queue.put(c, this);
                         }
                         pagesToWrite.clear();
-
                         splitKey.setRightPage(leafFrontier.pageId);
                     }
                     if (tupleSize > maxTupleSize) {
@@ -1155,7 +1152,7 @@
                 ICachedPage lastLeaf = nodeFrontiers.get(level).page;
                 int lastLeafPage = nodeFrontiers.get(level).pageId;
                 lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
-                queue.put(lastLeaf);
+                queue.put(lastLeaf, this);
                 nodeFrontiers.get(level).page = null;
                 persistFrontiers(level + 1, lastLeafPage);
                 return;
@@ -1170,9 +1167,8 @@
             ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
             int finalPageId = freePageManager.takePage(metaFrame);
             frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-            queue.put(frontier.page);
+            queue.put(frontier.page, this);
             frontier.pageId = finalPageId;
-
             persistFrontiers(level + 1, finalPageId);
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
index b7987f8..f8a929d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.common.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public interface IPageManager {
 
@@ -42,12 +43,14 @@
      * 3. When we need to have a persisted state.
      *
      * Note: This method will not force indexes to disk driver using fsync
+     *
      * @throws HyracksDataException
      */
-    void close() throws HyracksDataException;
+    void close(IPageWriteFailureCallback callback) throws HyracksDataException;
 
     /**
      * Create a metadata frame to be used for reading and writing to metadata pages
+     *
      * @return a new metadata frame
      */
     ITreeIndexMetadataFrame createMetadataFrame();
@@ -87,6 +90,7 @@
     /**
      * Get the location of a block of free pages to use for index operations
      * This is used for records that are larger than a normal page
+     *
      * @param frame
      *            A metadata frame to use to wrap metadata pages
      * @return The starting page location, or -1 if a block of free pages could be found or allocated
@@ -107,6 +111,7 @@
 
     /**
      * Add a page back to the pool of free pages within an index file
+     *
      * @param frame
      *            A metadata frame to use to wrap metadata pages
      * @param page
@@ -129,6 +134,7 @@
 
     /**
      * Check whether the index is empty or not.
+     *
      * @param frame
      *            interior frame
      * @param rootPage
@@ -140,6 +146,7 @@
 
     /**
      * Get the root page of the id
+     *
      * @return the root page
      * @throws HyracksDataException
      */
@@ -147,6 +154,7 @@
 
     /**
      * Get the first page to start the bulk load
+     *
      * @return
      * @throws HyracksDataException
      */
@@ -154,6 +162,7 @@
 
     /**
      * Set the root page id and finalize the bulk load operation
+     *
      * @param rootPage
      * @throws HyracksDataException
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 5c389d2..97e7ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
@@ -207,7 +208,7 @@
     }
 
     @Override
-    public void close() throws HyracksDataException {
+    public void close(IPageWriteFailureCallback callback) throws HyracksDataException {
         if (ready) {
             IFIFOPageQueue queue = bufferCache.createFIFOQueue();
             ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
@@ -220,7 +221,9 @@
             }
             int finalMetaPage = getMaxPageId(metaFrame) + 1;
             confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
-            queue.put(confiscatedPage);
+            // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
+            // won't be flushed to disk because it won't be dirty until the write latch has been released.
+            queue.put(confiscatedPage, callback);
             bufferCache.finishQueue();
             metadataPage = getMetadataPageId();
             ready = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 951d824..e348e24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -28,8 +28,14 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrameFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
+/**
+ * @deprecated
+ *             This class must not be used. Instead, use {@link AppendOnlyLinkedMetadataPageManager}
+ */
+@Deprecated
 public class LinkedMetaDataPageManager implements IMetadataPageManager {
     private final IBufferCache bufferCache;
     private int fileId = -1;
@@ -238,7 +244,7 @@
     }
 
     @Override
-    public void close() throws HyracksDataException {
+    public void close(IPageWriteFailureCallback callback) throws HyracksDataException {
         if (ready) {
             ICachedPage metaNode =
                     bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()), false);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 905c99d..b77f14f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -26,19 +26,19 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -95,7 +95,7 @@
             freePageManager.open(fileId);
             freePageManager.init(interiorFrameFactory, leafFrameFactory);
             setRootPage();
-            freePageManager.close();
+            freePageManager.close(HaltOnFailureCallback.INSTANCE);
             failed = false;
         } finally {
             bufferCache.closeFile(fileId);
@@ -132,7 +132,7 @@
         if (!isActive) {
             throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX);
         }
-        freePageManager.close();
+        freePageManager.close(HaltOnFailureCallback.INSTANCE);
         bufferCache.closeFile(fileId);
         isActive = false;
     }
@@ -227,7 +227,7 @@
         return fieldCount;
     }
 
-    public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+    public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader {
         protected final MultiComparator cmp;
         protected final int slotSize;
         protected final int leafMaxBytes;
@@ -297,6 +297,9 @@
         @Override
         public void end() throws HyracksDataException {
             bufferCache.finishQueue();
+            if (hasFailed()) {
+                throw HyracksDataException.create(getFailure());
+            }
             freePageManager.setRootPageId(rootPage);
         }
 
@@ -317,31 +320,6 @@
         public void setLeafFrame(ITreeIndexFrame leafFrame) {
             this.leafFrame = leafFrame;
         }
-
-    }
-
-    public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
-        ITreeIndexAccessor accessor;
-
-        public TreeIndexInsertBulkLoader() throws HyracksDataException {
-            accessor = (ITreeIndexAccessor) createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        }
-
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            accessor.insert(tuple);
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            // do nothing
-        }
-
-        @Override
-        public void abort() {
-
-        }
-
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index a504f7e..8be75be 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -60,6 +60,7 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.util.trace.ITracer;
 
 /**
@@ -462,12 +463,15 @@
                     if (isTransaction) {
                         // Since this is a transaction component, validate and
                         // deactivate. it could later be added or deleted
-                        component.markAsValid(durable);
-                        ioOpCallback.afterFinalize(loadOp);
+                        try {
+                            component.markAsValid(durable, loadOp);
+                        } finally {
+                            ioOpCallback.afterFinalize(loadOp);
+                        }
                         component.deactivate();
                     } else {
                         ioOpCallback.afterFinalize(loadOp);
-                        getHarness().addBulkLoadedComponent(component);
+                        getHarness().addBulkLoadedComponent(loadOp);
                     }
                 }
             } finally {
@@ -490,6 +494,21 @@
                 ioOpCallback.completed(loadOp);
             }
         }
+
+        @Override
+        public void writeFailed(ICachedPage page, Throwable failure) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasFailed() {
+            return componentBulkLoader.hasFailed();
+        }
+
+        @Override
+        public Throwable getFailure() {
+            return componentBulkLoader.getFailure();
+        }
     }
 
     // The accessor for disk only indexes don't use modification callback and always carry the target index version with them
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index b727a39..5bcf30d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -65,6 +65,7 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.util.trace.ITracer;
 
 public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeIndex, ITwoPCIndex {
@@ -544,12 +545,15 @@
                     if (isTransaction) {
                         // Since this is a transaction component, validate and
                         // deactivate. it could later be added or deleted
-                        component.markAsValid(durable);
-                        ioOpCallback.afterFinalize(loadOp);
+                        try {
+                            component.markAsValid(durable, loadOp);
+                        } finally {
+                            ioOpCallback.afterFinalize(loadOp);
+                        }
                         component.deactivate();
                     } else {
                         ioOpCallback.afterFinalize(loadOp);
-                        getHarness().addBulkLoadedComponent(component);
+                        getHarness().addBulkLoadedComponent(loadOp);
                     }
                 }
             } finally {
@@ -574,6 +578,21 @@
                 ioOpCallback.completed(loadOp);
             }
         }
+
+        @Override
+        public void writeFailed(ICachedPage page, Throwable failure) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasFailed() {
+            return componentBulkLoader.hasFailed();
+        }
+
+        @Override
+        public Throwable getFailure() {
+            return componentBulkLoader.getFailure();
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 329a54b..6b6e5e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLSMDiskComponent {
     public AbstractLSMWithBloomFilterDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
@@ -42,11 +43,11 @@
     public abstract IBufferCache getBloomFilterBufferCache();
 
     @Override
-    public void markAsValid(boolean persist) throws HyracksDataException {
+    public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
         // The order of forcing the dirty page to be flushed is critical. The
         // bloom filter must be always done first.
         ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist);
-        super.markAsValid(persist);
+        super.markAsValid(persist, callback);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index cace9e5..f7feb78 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexWithBuddyBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public abstract class AbstractLSMWithBuddyDiskComponent extends AbstractLSMWithBloomFilterDiskComponent {
 
@@ -37,9 +38,9 @@
     public abstract AbstractTreeIndex getBuddyIndex();
 
     @Override
-    public void markAsValid(boolean persist) throws HyracksDataException {
-        super.markAsValid(persist);
-        ComponentUtils.markAsValid(getBuddyIndex(), persist);
+    public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
+        super.markAsValid(persist, callback);
+        ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 1500f37..543779c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public interface ILSMDiskComponent extends ILSMComponent {
 
@@ -68,9 +69,11 @@
      *
      * @param persist
      *            whether the call should force data to disk before returning
+     * @param callback
+     *            callback for when a page write operation fails
      * @throws HyracksDataException
      */
-    void markAsValid(boolean persist) throws HyracksDataException;
+    void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException;
 
     /**
      * Activates the component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index c4a0352..9e8c568 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -150,12 +150,12 @@
     /**
      * Add bulk loaded component
      *
-     * @param index
-     *            the new component
+     * @param ioOperation
+     *            the io operation that added the new component
      * @throws HyracksDataException
      * @throws IndexException
      */
-    void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException;
+    void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
 
     /**
      * Get index operation tracker
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 3245455..0e13933 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -26,8 +26,9 @@
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
-public interface ILSMIOOperation extends Callable<LSMIOOperationStatus> {
+public interface ILSMIOOperation extends Callable<LSMIOOperationStatus>, IPageWriteFailureCallback {
 
     /**
      * Represents the io operation type
@@ -94,6 +95,7 @@
     /**
      * @return the failure in the io operation if any, null otherwise
      */
+    @Override
     Throwable getFailure();
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
index 08c75dc..9d62c0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public class VirtualFreePageManager implements IPageManager {
@@ -88,7 +89,7 @@
     }
 
     @Override
-    public void close() {
+    public void close(IPageWriteFailureCallback callback) {
         // Method doesn't make sense for this free page manager.
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index fc9a362..3d76755 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public abstract class AbstractIoOperation implements ILSMIOOperation {
 
@@ -37,7 +38,7 @@
     protected final FileReference target;
     protected final ILSMIOOperationCallback callback;
     protected final String indexIdentifier;
-    private Throwable failure;
+    private volatile Throwable failure;
     private LSMIOOperationStatus status = LSMIOOperationStatus.SUCCESS;
     private ILSMDiskComponent newComponent;
     private boolean completed = false;
@@ -146,4 +147,14 @@
             completeListeners.add(listener);
         }
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        setFailure(failure);
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return status == LSMIOOperationStatus.FAILURE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index aa312fb..a88a19a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -156,8 +157,8 @@
      * @throws HyracksDataException
      */
     @Override
-    public void markAsValid(boolean persist) throws HyracksDataException {
-        ComponentUtils.markAsValid(getMetadataHolder(), persist);
+    public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
+        ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
         if (LOGGER.isInfoEnabled()) {
             LOGGER.log(Level.INFO, "Marked as valid component with id: " + getId());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
index 29ca388..a9c70e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class BloomFilterBulkLoader implements IChainedComponentBulkLoader {
 
@@ -65,4 +66,19 @@
             endedBloomFilterLoad = true;
         }
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return bulkLoader.hasFailed();
+    }
+
+    @Override
+    public Throwable getFailure() {
+        return bulkLoader.getFailure();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 6e0606a..ab59b59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.util.annotations.CriticalPath;
 
 /**
@@ -96,8 +97,9 @@
     public void cleanupArtifacts() throws HyracksDataException {
         if (!cleanedUpArtifacts) {
             cleanedUpArtifacts = true;
-            for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
-                lsmOperation.cleanupArtifacts();
+            final int bulkloadersCount = bulkloaderChain.size();
+            for (int i = 0; i < bulkloadersCount; i++) {
+                bulkloaderChain.get(i).cleanupArtifacts();;
             }
         }
         diskComponent.deactivateAndDestroy();
@@ -106,8 +108,9 @@
     @Override
     public void end() throws HyracksDataException {
         if (!cleanedUpArtifacts) {
-            for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
-                lsmOperation.end();
+            final int bulkloadersCount = bulkloaderChain.size();
+            for (int i = 0; i < bulkloadersCount; i++) {
+                bulkloaderChain.get(i).end();
             }
             if (isEmptyComponent && cleanupEmptyComponent) {
                 cleanupArtifacts();
@@ -118,8 +121,9 @@
     @Override
     public void abort() throws HyracksDataException {
         operation.setStatus(LSMIOOperationStatus.FAILURE);
-        for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
-            lsmOperation.abort();
+        final int bulkloadersCount = bulkloaderChain.size();
+        for (int i = 0; i < bulkloadersCount; i++) {
+            bulkloaderChain.get(i).abort();
         }
     }
 
@@ -127,4 +131,31 @@
     public ILSMIOOperation getOperation() {
         return operation;
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        final int bulkloadersCount = bulkloaderChain.size();
+        for (int i = 0; i < bulkloadersCount; i++) {
+            if (bulkloaderChain.get(i).hasFailed()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Throwable getFailure() {
+        final int bulkloadersCount = bulkloaderChain.size();
+        for (int i = 0; i < bulkloadersCount; i++) {
+            if (bulkloaderChain.get(i).hasFailed()) {
+                return bulkloaderChain.get(i).getFailure();
+            }
+        }
+        return null;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 4c2ddb6..a2d9fdd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public class EmptyComponent implements ILSMDiskComponent {
     public static final EmptyComponent INSTANCE = new EmptyComponent();
@@ -105,7 +106,7 @@
     }
 
     @Override
-    public void markAsValid(boolean persist) throws HyracksDataException {
+    public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
         // No Op
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index ab70ba1..854e541 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -192,9 +192,18 @@
         }
     }
 
+    @SuppressWarnings("squid:S1181")
     @Override
-    public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
-        c.markAsValid(lsmIndex.isDurable());
+    public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException {
+        ILSMDiskComponent c = ioOperation.getNewComponent();
+        try {
+            c.markAsValid(lsmIndex.isDurable(), ioOperation);
+        } catch (Throwable th) {
+            ioOperation.setFailure(th);
+        }
+        if (ioOperation.hasFailed()) {
+            throw HyracksDataException.create(ioOperation.getFailure());
+        }
         synchronized (opTracker) {
             lsmIndex.addDiskComponent(c);
             if (replicationEnabled) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
index 43d2b0d..880f5be 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class FilterBulkLoader implements IChainedComponentBulkLoader {
 
@@ -79,4 +80,19 @@
         filterTuple.reset(tuple);
         filter.update(filterTuple, filterCmp, NoOpOperationCallback.INSTANCE);
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return false;
+    }
+
+    @Override
+    public Throwable getFailure() {
+        return null;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
index 90ef127..1361c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
@@ -20,8 +20,9 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
-public interface IChainedComponentBulkLoader {
+public interface IChainedComponentBulkLoader extends IPageWriteFailureCallback {
     /**
      * Adds a tuple to the bulkloaded component
      *
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
index 4fb2919..394126d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader {
 
@@ -69,4 +70,24 @@
         bulkLoader.abort();
         buddyBTreeBulkLoader.abort();
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return bulkLoader.hasFailed() || buddyBTreeBulkLoader.hasFailed();
+    }
+
+    @Override
+    public Throwable getFailure() {
+        if (bulkLoader.hasFailed()) {
+            return bulkLoader.getFailure();
+        } else if (buddyBTreeBulkLoader.hasFailed()) {
+            return buddyBTreeBulkLoader.getFailure();
+        }
+        return null;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 4d840b0..3eea0a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -537,7 +537,7 @@
             operation.setNewComponent(newComponent);
             operation.getCallback().afterOperation(operation);
             if (newComponent != null) {
-                newComponent.markAsValid(lsmIndex.isDurable());
+                newComponent.markAsValid(lsmIndex.isDurable(), operation);
             }
         } catch (Throwable e) { // NOSONAR Must catch all
             operation.setStatus(LSMIOOperationStatus.FAILURE);
@@ -613,9 +613,18 @@
         return operation;
     }
 
+    @SuppressWarnings("squid:S1181")
     @Override
-    public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
-        c.markAsValid(lsmIndex.isDurable());
+    public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException {
+        ILSMDiskComponent c = ioOperation.getNewComponent();
+        try {
+            c.markAsValid(lsmIndex.isDurable(), ioOperation);
+        } catch (Throwable th) {
+            ioOperation.setFailure(th);
+        }
+        if (ioOperation.hasFailed()) {
+            throw HyracksDataException.create(ioOperation.getFailure());
+        }
         synchronized (opTracker) {
             lsmIndex.addDiskComponent(c);
             if (replicationEnabled) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
index 84857f4..977697b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class LSMIndexBulkLoader implements IChainedComponentBulkLoader {
     private final IIndexBulkLoader bulkLoader;
@@ -64,4 +65,19 @@
     public void abort() throws HyracksDataException {
         bulkLoader.abort();
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return bulkLoader.hasFailed();
+    }
+
+    @Override
+    public Throwable getFailure() {
+        return bulkLoader.getFailure();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 10074f9..3a43ba7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
@@ -79,7 +80,7 @@
             }
             if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
                     && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
-                lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation().getNewComponent());
+                lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
             }
         } finally {
             lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation());
@@ -100,4 +101,19 @@
         }
     }
 
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return opCtx.getIoOperation().hasFailed();
+    }
+
+    @Override
+    public Throwable getFailure() {
+        return opCtx.getIoOperation().getFailure();
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index f57c4ef..5ee1503 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 public class NoOpIoOperation implements ILSMIOOperation {
     public static final NoOpIoOperation INSTANCE = new NoOpIoOperation();
@@ -126,4 +127,14 @@
         return null;
     }
 
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return false;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index f1172f3..3345e3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.ITracer.Scope;
 import org.apache.hyracks.util.trace.TraceUtils;
@@ -162,4 +163,14 @@
     public Map<String, Object> getParameters() {
         return ioOp.getParameters();
     }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        ioOp.writeFailed(page, failure);
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return ioOp.hasFailed();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 4b7f338..1ff9fa8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -190,12 +191,14 @@
         }
     }
 
-    public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk) throws HyracksDataException {
+    public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk, IPageWriteFailureCallback callback)
+            throws HyracksDataException {
         int fileId = treeIndex.getFileId();
         IBufferCache bufferCache = treeIndex.getBufferCache();
-        treeIndex.getPageManager().close();
-        // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
-        // won't be flushed to disk because it won't be dirty until the write latch has been released.
+        treeIndex.getPageManager().close(callback);
+        if (callback.hasFailed()) {
+            throw HyracksDataException.create(callback.getFailure());
+        }
         // Force modified metadata page to disk.
         // If the index is not durable, then the flush is not necessary.
         if (forceToDisk) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index b030e83..41e72cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
 public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskComponent {
 
@@ -102,15 +103,19 @@
     }
 
     @Override
-    public void markAsValid(boolean persist) throws HyracksDataException {
+    public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
         ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist);
 
         // Flush inverted index second.
         invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true);
-        ComponentUtils.markAsValid(getMetadataHolder(), persist);
-
-        // Flush deleted keys BTree.
-        ComponentUtils.markAsValid(getBuddyIndex(), persist);
+        ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+        if (!callback.hasFailed()) {
+            // Flush deleted keys BTree.
+            ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+        }
+        if (callback.hasFailed()) {
+            throw HyracksDataException.create(callback.getFailure());
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index c3c9c21..0b504a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -61,6 +61,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 /**
@@ -231,7 +232,8 @@
         listCursor.open(initState, null);
     }
 
-    public abstract class AbstractOnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
+    public abstract class AbstractOnDiskInvertedIndexBulkLoader extends PageWriteFailureCallback
+            implements IIndexBulkLoader {
         protected final ArrayTupleBuilder btreeTupleBuilder;
         protected final ArrayTupleReference btreeTupleReference;
         protected final IIndexBulkLoader btreeBulkloader;
@@ -272,7 +274,7 @@
         }
 
         protected void pinNextPage() throws HyracksDataException {
-            queue.put(currentPage);
+            queue.put(currentPage, this);
             currentPageId++;
             currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
         }
@@ -352,10 +354,13 @@
             btreeBulkloader.end();
 
             if (currentPage != null) {
-                queue.put(currentPage);
+                queue.put(currentPage, this);
             }
             invListsMaxPageId = currentPageId;
             bufferCache.finishQueue();
+            if (hasFailed()) {
+                throw HyracksDataException.create(getFailure());
+            }
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f902153..5e8bd0b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -59,6 +59,7 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.util.trace.ITracer;
 
 /**
@@ -499,12 +500,15 @@
                     if (isTransaction) {
                         // Since this is a transaction component, validate and
                         // deactivate. it could later be added or deleted
-                        component.markAsValid(durable);
-                        ioOpCallback.afterFinalize(loadOp);
+                        try {
+                            component.markAsValid(durable, loadOp);
+                        } finally {
+                            ioOpCallback.afterFinalize(loadOp);
+                        }
                         component.deactivate();
                     } else {
                         ioOpCallback.afterFinalize(loadOp);
-                        getHarness().addBulkLoadedComponent(component);
+                        getHarness().addBulkLoadedComponent(loadOp);
                     }
                 }
             } finally {
@@ -529,6 +533,21 @@
                 ioOpCallback.completed(loadOp);
             }
         }
+
+        @Override
+        public void writeFailed(ICachedPage page, Throwable failure) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasFailed() {
+            return loadOp.hasFailed();
+        }
+
+        @Override
+        public Throwable getFailure() {
+            return loadOp.getFailure();
+        }
     }
 
     // The only change the the schedule merge is the method used to create the
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 0e455c5..f12f423 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -941,11 +941,10 @@
                     propagateBulk(1, false, pagesToWrite);
 
                     leafFrontier.pageId = freePageManager.takePage(metaFrame);
-                    queue.put(leafFrontier.page);
+                    queue.put(leafFrontier.page, this);
                     for (ICachedPage c : pagesToWrite) {
-                        queue.put(c);
+                        queue.put(c, this);
                     }
-
                     pagesToWrite.clear();
                     leafFrontier.page = bufferCache
                             .confiscatePage(BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId));
@@ -975,7 +974,7 @@
             }
 
             for (ICachedPage c : pagesToWrite) {
-                queue.put(c);
+                queue.put(c, this);
             }
             finish();
             super.end();
@@ -1011,7 +1010,7 @@
                     ((RTreeNSMFrame) lowerFrame).adjustMBR();
                     interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
                 }
-                queue.put(n.page);
+                queue.put(n.page, this);
                 n.page = null;
                 prevPageId = n.pageId;
             }
@@ -1021,7 +1020,6 @@
 
         protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite)
                 throws HyracksDataException {
-            boolean propagated = false;
 
             if (level == 1) {
                 lowerFrame = leafFrame;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 67d32dd..a76fe48 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -54,6 +54,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
index d6c954e..138705f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
@@ -20,8 +20,9 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 
-public interface IIndexBulkLoader {
+public interface IIndexBulkLoader extends IPageWriteFailureCallback {
     /**
      * Append a tuple to the index in the context of a bulk load.
      *
@@ -36,6 +37,7 @@
 
     /**
      * Finalize the bulk loading operation in the given context and release all resources.
+     * After this method is called, caller can't add more tuples nor abort
      *
      * @throws HyracksDataException
      *             If the BufferCache throws while un/pinning or un/latching.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
index dbead1e..589d697 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
@@ -24,15 +24,19 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class AsyncFIFOPageQueueManager implements Runnable {
-    private final static boolean DEBUG = false;
+    private static final boolean DEBUG = false;
+    private static final Logger LOGGER = LogManager.getLogger();
 
-    protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<ICachedPage>();
+    protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<>();
     volatile Thread writerThread;
     protected AtomicBoolean poisoned = new AtomicBoolean(false);
     protected BufferCache bufferCache;
-    volatile protected PageQueue pageQueue;
+    protected volatile PageQueue pageQueue;
 
     public AsyncFIFOPageQueueManager(BufferCache bufferCache) {
         this.bufferCache = bufferCache;
@@ -57,17 +61,27 @@
             return writer;
         }
 
+        @SuppressWarnings("squid:S2142")
         @Override
-        public void put(ICachedPage page) throws HyracksDataException {
+        public void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException {
+            failIfPreviousPageFailed(callback);
+            page.setFailureCallback(callback);
             try {
                 if (!poisoned.get()) {
                     queue.put(page);
                 } else {
-                    throw new HyracksDataException("Queue is closing");
+                    LOGGER.error("An attempt to write a page found buffer cache closed");
+                    ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
                 }
             } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
+                LOGGER.error("IO Operation interrupted", e);
+                ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+            }
+        }
+
+        private void failIfPreviousPageFailed(IPageWriteFailureCallback callback) throws HyracksDataException {
+            if (callback.hasFailed()) {
+                throw HyracksDataException.create(callback.getFailure());
             }
         }
     }
@@ -136,18 +150,21 @@
         }
     }
 
+    @SuppressWarnings("squid:S2142")
     @Override
     public void run() {
-        if (DEBUG)
-            System.out.println("[FIFO] Writer started");
+        if (DEBUG) {
+            LOGGER.info("[FIFO] Writer started");
+        }
         boolean die = false;
         while (!die) {
             ICachedPage entry;
             try {
                 entry = queue.take();
             } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                break;
+                LOGGER.error("BufferCache Write Queue was interrupted", e);
+                ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+                return; // Keep compiler happy
             }
             if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()) {
                 synchronized (entry) {
@@ -158,17 +175,11 @@
                     continue;
                 }
             }
-
-            if (DEBUG)
-                System.out.println("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + ","
+            if (DEBUG) {
+                LOGGER.info("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + ","
                         + BufferedFileHandle.getPageId(((CachedPage) entry).dpid));
-
-            try {
-                pageQueue.getWriter().write(entry, bufferCache);
-            } catch (HyracksDataException e) {
-                //TODO: What do we do, if we could not write the page?
-                e.printStackTrace();
             }
+            pageQueue.getWriter().write(entry, bufferCache);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 02eb8bf..6ec12aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,14 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 /**
  * @author yingyib
  */
 public class CachedPage implements ICachedPageInternal {
+    private static final Logger LOGGER = LogManager.getLogger();
     final int cpid;
     ByteBuffer buffer;
     public final AtomicInteger pinCount;
@@ -44,6 +48,7 @@
     // DEBUG
     private static final boolean DEBUG = false;
     private final StackTraceElement[] ctorStack;
+    private IPageWriteFailureCallback failureCallback;
 
     //Constructor for making dummy entry for FIFO queue
     public CachedPage() {
@@ -85,6 +90,7 @@
         confiscated.set(false);
         pageReplacementStrategy.notifyCachePageReset(this);
         queueInfo = null;
+        failureCallback = null;
     }
 
     public void invalidate() {
@@ -103,11 +109,7 @@
 
     @Override
     public boolean isGoodVictim() {
-        if (confiscated.get()) {
-            return false; // i am not a good victim because i cant flush!
-        } else {
-            return pinCount.get() == 0;
-        }
+        return !confiscated.get() && pinCount.get() == 0;
     }
 
     @Override
@@ -205,4 +207,21 @@
     public boolean isLargePage() {
         return multiplier > 1;
     }
+
+    @Override
+    public void setFailureCallback(IPageWriteFailureCallback failureCallback) {
+        if (this.failureCallback != null) {
+            throw new IllegalStateException("failureCallback is already set");
+        }
+        this.failureCallback = failureCallback;
+    }
+
+    @Override
+    public void writeFailed(Exception e) {
+        if (failureCallback != null) {
+            failureCallback.writeFailed(this, e);
+        } else {
+            LOGGER.error("An IO Failure took place but the failure callback is not set", e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 856edbc..3d3ce3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -15,35 +15,37 @@
 
 package org.apache.hyracks.storage.common.buffercache;
 
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class FIFOLocalWriter implements IFIFOPageWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter();
-    private static boolean DEBUG = false;
+    private static final boolean DEBUG = false;
 
     private FIFOLocalWriter() {
     }
 
+    @SuppressWarnings("squid:S1181") // System must halt on all IO errors
     @Override
-    public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException {
+    public void write(ICachedPage page, BufferCache bufferCache) {
         CachedPage cPage = (CachedPage) page;
         try {
             bufferCache.write(cPage);
-        } catch (HyracksDataException e) {
-            if (e.getErrorCode() != ErrorCode.FILE_DOES_NOT_EXIST) {
-                throw HyracksDataException.create(e);
-            }
+        } catch (Exception e) {
+            page.writeFailed(e);
+            LOGGER.warn("Failed to write page {}", cPage, e);
+        } catch (Throwable th) {
+            // Halt
+            LOGGER.error("FIFOLocalWriter has encountered a fatal error", th);
+            ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
         } finally {
             bufferCache.returnPage(cPage);
             if (DEBUG) {
-                System.out.println("[FIFO] Return page: " + cPage.cpid + "," + cPage.dpid);
+                LOGGER.error("[FIFO] Return page: {}, {}", cPage.cpid, cPage.dpid);
             }
         }
     }
 
-    @Override
-    public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException {
-        bufferCache.force(fileId, true);
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java
new file mode 100644
index 0000000..0b748e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.util.ExitUtil;
+
+public class HaltOnFailureCallback implements IPageWriteFailureCallback {
+    public static final HaltOnFailureCallback INSTANCE = new HaltOnFailureCallback();
+
+    private HaltOnFailureCallback() {
+    }
+
+    @Override
+    public void writeFailed(ICachedPage page, Throwable failure) {
+        ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+    }
+
+    @Override
+    public boolean hasFailed() {
+        return false;
+    }
+
+    @Override
+    public Throwable getFailure() {
+        return null;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
index 16837b9..cfbb145 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
@@ -44,10 +44,14 @@
 
     void setDiskPageId(long dpid);
 
+    void setFailureCallback(IPageWriteFailureCallback callback);
+
     /**
      * Check if a page is a large page
      *
      * @return true if the page is large, false otherwise
      */
     boolean isLargePage();
+
+    void writeFailed(Exception e);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
index c500286..d900852 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -19,13 +19,16 @@
 package org.apache.hyracks.storage.common.buffercache;
 
 public interface ICachedPageInternal extends ICachedPage {
-    public int getCachedPageId();
+    int getCachedPageId();
 
-    public long getDiskPageId();
+    long getDiskPageId();
 
-    public Object getReplacementStrategyObject();
+    Object getReplacementStrategyObject();
 
-    public boolean isGoodVictim();
+    /**
+     * @return true if can be evicted, false otherwise
+     */
+    boolean isGoodVictim();
 
     void setFrameSizeMultiplier(int multiplier);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
index 6c03671..189c402 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
@@ -19,5 +19,20 @@
 
 @FunctionalInterface
 public interface IFIFOPageQueue {
-    void put(ICachedPage page) throws HyracksDataException;
+
+    /**
+     * Put a page in the write queue
+     *
+     * @param page
+     *            the page to be written
+     * @param callback
+     *            callback in case of a failure
+     * @throws HyracksDataException
+     *             if the callback has already failed. This indicates a failure writing a previous page
+     *             in the same operation.
+     *             Note: having this failure at this place removes the need to check for failures with
+     *             every add() call in the bulk loader and so, we check per page given to disk rather
+     *             than per tuple given to loader. At the same time, it allows the bulk load to fail early.
+     */
+    void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
index 567c01e..26fd414 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
@@ -15,10 +15,7 @@
 
 package org.apache.hyracks.storage.common.buffercache;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
+@FunctionalInterface
 public interface IFIFOPageWriter {
-    public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException;
-
-    void sync(int fileId, BufferCache bufferCache) throws HyracksDataException;
+    void write(ICachedPage page, BufferCache bufferCache);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java
new file mode 100644
index 0000000..da9cb6a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+public interface IPageWriteFailureCallback {
+
+    /**
+     * Notify that an async write operation has failed
+     *
+     * @param page
+     * @param failure
+     */
+    void writeFailed(ICachedPage page, Throwable failure);
+
+    /**
+     * @return true if the callback has received any failure
+     */
+    boolean hasFailed();
+
+    /**
+     * @return a failure writing to disk or null if no failure has been seen
+     *         This doesn't guarantee which failure is returned but that if one or more failures occurred
+     *         while trying to write to disk, one of those failures is returned. All other failures are expected
+     *         to be logged.
+     */
+    Throwable getFailure();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java
new file mode 100644
index 0000000..c11e596
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+public class PageWriteFailureCallback implements IPageWriteFailureCallback {
+
+    private volatile Throwable failure;
+
+    @Override
+    public final void writeFailed(ICachedPage page, Throwable failure) {
+        if (this.failure == null) {
+            this.failure = failure;
+        }
+    }
+
+    @Override
+    public final boolean hasFailed() {
+        return failure != null;
+    }
+
+    @Override
+    public final Throwable getFailure() {
+        return failure;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
index d7ec4e9..b2a1ff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
@@ -147,4 +147,14 @@
         str.append("}");
         return str.toString();
     }
+
+    @Override
+    public void setFailureCallback(IPageWriteFailureCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeFailed(Exception e) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
index c77bea0..853a2ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
@@ -183,6 +183,9 @@
             bulkloader.add(tuple);
         }
         bulkloader.end();
+        if (bulkloader.hasFailed()) {
+            throw HyracksDataException.create(bulkloader.getFailure());
+        }
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index f94914c..d0d02a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -33,7 +33,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -41,6 +40,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -96,7 +96,7 @@
             long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
             ICachedPage page = bufferCache.confiscatePage(dpid);
             page.getBuffer().putInt(0, i);
-            bufferCache.createFIFOQueue().put(page);
+            bufferCache.createFIFOQueue().put(page, HaltOnFailureCallback.INSTANCE);
         }
         bufferCache.finishQueue();
         bufferCache.closeFile(fileId);