diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 8fd9f6b..9d57dde 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -209,7 +209,7 @@
         //Reserved for the number of pages
         int requiredFreeSpace = HEADER_SIZE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getColumnOffsetsSize();
+        requiredFreeSpace += columnWriter.getColumnOffsetsSize(true);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getOccupiedSpace();
         //New tuple required space
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 92b3892..14f2399 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -62,8 +62,8 @@
      *
      * @return the size needed to store columns' offsets
      */
-    public final int getColumnOffsetsSize() {
-        return Integer.BYTES * getNumberOfColumns(true);
+    public final int getColumnOffsetsSize(boolean includeCurrentTupleColumns) {
+        return Integer.BYTES * getNumberOfColumns(includeCurrentTupleColumns);
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index ee61fa6..114ac05 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -40,9 +40,12 @@
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.JSONUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp {
     private static final Logger LOGGER = LogManager.getLogger();
     private final List<CachedPage> columnsPages;
@@ -60,6 +63,7 @@
     private int maxNumberOfPagesForAColumn;
     private int maxNumberOfPagesInALeafNode;
     private int maxTupleCount;
+    private int lastRequiredFreeSpace;
 
     public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
             ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
@@ -80,6 +84,7 @@
         maxNumberOfPagesInALeafNode = 0;
         numberOfLeafNodes = 1;
         maxTupleCount = 0;
+        lastRequiredFreeSpace = 0;
     }
 
     @Override
@@ -109,13 +114,14 @@
         int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
         //Columns' Offsets
         columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        requiredFreeSpace += columnWriter.getColumnOffsetsSize();
+        requiredFreeSpace += columnWriter.getColumnOffsetsSize(true);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getOccupiedSpace();
         //min and max tuples' sizes
         requiredFreeSpace += lowKey.getTuple().getTupleSize() + getSplitKeySize(tuple);
         //New tuple required space
         requiredFreeSpace += columnWriter.bytesRequired(tuple);
+        lastRequiredFreeSpace = requiredFreeSpace;
         return bufferCache.getPageSize() <= requiredFreeSpace;
     }
 
@@ -132,7 +138,12 @@
     public void end() throws HyracksDataException {
         if (tupleCount > 0) {
             splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-            columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+            try {
+                columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+            } catch (Exception e) {
+                logState(e);
+                throw e;
+            }
         }
         columnWriter.close();
         //We are done, return any temporary confiscated pages
@@ -156,7 +167,12 @@
         splitKey.setLeftPage(leafFrontier.pageId);
         if (tupleCount > 0) {
             //We need to flush columns to confiscate all columns pages first before calling propagateBulk
-            columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+            try {
+                columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+            } catch (Exception e) {
+                logState(e);
+                throw e;
+            }
         }
 
         propagateBulk(1, pagesToWrite);
@@ -285,4 +301,26 @@
         tempConfiscatedPages.add(page);
         return page.getBuffer();
     }
+
+    private void logState(Exception e) {
+        try {
+            ObjectNode state = JSONUtil.createObject();
+            // number of tuples processed for the current leaf
+            state.put("currentLeafTupleCount", tupleCount);
+            // number of columns
+            state.put("currentLeafColumnCount", columnWriter.getNumberOfColumns(false));
+            // number of columns including current tuple
+            state.put("currentColumnCount", columnWriter.getNumberOfColumns(true));
+            state.put("lastRequiredFreeSpace", lastRequiredFreeSpace);
+            state.put("splitKeyTupleSize", splitKey.getTuple().getTupleSize());
+            state.put("splitKeyTupleSizeByTupleWriter", tupleWriter.bytesRequired(splitKey.getTuple()));
+            state.put("lowKeyTupleSize", lowKey.getTuple().getTupleSize());
+            ObjectNode bufNode = state.putObject("leafBufferDetails");
+            columnarFrame.dumpBuffer(bufNode);
+            LOGGER.error("pageZero flush failed {}", state, e);
+        } catch (Exception ex) {
+            e.addSuppressed(ex);
+        }
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index bced7ef..1fc5712 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -23,6 +23,8 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class ColumnBTreeWriteLeafFrame extends AbstractColumnBTreeLeafFrame {
     private final AbstractColumnTupleWriter columnTupleWriter;
 
@@ -63,7 +65,7 @@
         int numberOfColumns = columnWriter.getNumberOfColumns(false);
         buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
         buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns);
-        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, columnWriter.getColumnOffsetsSize());
+        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, columnWriter.getColumnOffsetsSize(false));
     }
 
     public AbstractColumnTupleWriter getColumnTupleWriter() {
@@ -73,4 +75,16 @@
     void setNextLeaf(int pageId) {
         buf.putInt(NEXT_LEAF_OFFSET, pageId);
     }
+
+    public void dumpBuffer(ObjectNode bufNode) {
+        bufNode.put("tupleCount", buf.getInt(TUPLE_COUNT_OFFSET));
+        bufNode.put("level", buf.get(Constants.LEVEL_OFFSET));
+        bufNode.put("numberOfColumns", buf.getInt(NUMBER_OF_COLUMNS_OFFSET));
+        bufNode.put("leftMostKeyOffset", buf.getInt(LEFT_MOST_KEY_OFFSET));
+        bufNode.put("rightMostKeyOffset", buf.getInt(RIGHT_MOST_KEY_OFFSET));
+        bufNode.put("sizeOfColumns", buf.getInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET));
+        bufNode.put("megaLeafNodeLength", buf.getInt(MEGA_LEAF_NODE_LENGTH));
+        bufNode.put("flagOffset", buf.get(FLAG_OFFSET));
+        bufNode.put("nextLeafOffset", buf.getInt(NEXT_LEAF_OFFSET));
+    }
 }
