diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a99fc22..74f5305 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -889,7 +889,8 @@
 
         StorageProperties storageProperties = metadataProvider.getStorageProperties();
         DatasetFormatInfo datasetFormatInfo = dd.getDatasetFormatInfo(storageProperties.getStorageFormat(),
-                storageProperties.getColumnMaxTupleCount(), storageProperties.getColumnFreeSpaceTolerance());
+                storageProperties.getColumnMaxTupleCount(), storageProperties.getColumnFreeSpaceTolerance(),
+                storageProperties.getColumnMaxLeafNodeSize());
         try {
             //TODO(DB): also check for database existence?
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/metadata/metadata.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/metadata/metadata.001.ddl.sqlpp
index 3ba53c8..7cb27a1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/metadata/metadata.001.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/metadata/metadata.001.ddl.sqlpp
@@ -42,7 +42,7 @@
 
 CREATE DATASET ColumnDataset4(ColumnType)
 PRIMARY KEY id WITH {
-    "storage-format": {"format" : "column", "free-space-tolerance": 0.11, "max-tuple-count": 10}
+    "storage-format": {"format" : "column", "free-space-tolerance": 0.11, "max-tuple-count": 10, "max-leaf-node-size": "5MB"}
 };
 
 CREATE DATASET RowDataset(ColumnType)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 716e0b4..1bb6ef9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -61,6 +61,7 @@
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
     "storage.column.free.space.tolerance" : 0.15,
+    "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
     "storage.compression.block" : "snappy",
     "storage.format" : "row",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 93fea8f..c2883ae 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -61,6 +61,7 @@
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
     "storage.column.free.space.tolerance" : 0.15,
+    "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
     "storage.compression.block" : "snappy",
     "storage.format" : "row",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index e68b30a..8c54bb8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -61,6 +61,7 @@
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
     "storage.column.free.space.tolerance" : 0.15,
+    "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
     "storage.compression.block" : "snappy",
     "storage.format" : "row",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/metadata/metadata.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/metadata/metadata.002.adm
index 4b0eed3..102b113 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/metadata/metadata.002.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/metadata/metadata.002.adm
@@ -1,5 +1,5 @@
-{ "DatasetName": "ColumnDataset1", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 15000, "FreeSpaceTolerance": 0.15 } }
-{ "DatasetName": "ColumnDataset2", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 10, "FreeSpaceTolerance": 0.15 } }
-{ "DatasetName": "ColumnDataset3", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 15000, "FreeSpaceTolerance": 0.11 } }
-{ "DatasetName": "ColumnDataset4", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 10, "FreeSpaceTolerance": 0.11 } }
+{ "DatasetName": "ColumnDataset1", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 15000, "FreeSpaceTolerance": 0.15, "MaxLeafNodeSize": 10485760 } }
+{ "DatasetName": "ColumnDataset2", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 10, "FreeSpaceTolerance": 0.15, "MaxLeafNodeSize": 10485760 } }
+{ "DatasetName": "ColumnDataset3", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 15000, "FreeSpaceTolerance": 0.11, "MaxLeafNodeSize": 10485760 } }
+{ "DatasetName": "ColumnDataset4", "DatasetFormat": { "Format": "COLUMN", "MaxTupleCount": 10, "FreeSpaceTolerance": 0.11, "MaxLeafNodeSize": 5242880 } }
 { "DatasetName": "RowDataset", "DatasetFormat": { "Format": "ROW" } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 622d91f..14076d4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16413,6 +16413,11 @@
         <output-dir compare="Text">analyze-dataset</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="metadata">
+        <output-dir compare="Text">metadata</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="copy-from">
     <test-case FilePath="copy-from">
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
index 359c3e1..cfd5143 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
@@ -46,12 +46,14 @@
     private final int pageSize;
     private final int maxTupleCount;
     private final double tolerance;
+    private final int maxLeafNodeSize;
 
     public ColumnManagerFactory(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
-            List<Integer> keySourceIndicator, int pageSize, int maxTupleCount, double tolerance) {
+            List<Integer> keySourceIndicator, int pageSize, int maxTupleCount, double tolerance, int maxLeafNodeSize) {
         this.pageSize = pageSize;
         this.maxTupleCount = maxTupleCount;
         this.tolerance = tolerance;
+        this.maxLeafNodeSize = maxLeafNodeSize;
 
         this.datasetType = datasetType;
         if (containsSplitKeys(keySourceIndicator)) {
@@ -70,17 +72,17 @@
 
     @Override
     public AbstractColumnTupleReaderWriterFactory getLoadColumnTupleReaderWriterFactory() {
-        return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance);
+        return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance, maxLeafNodeSize);
     }
 
     @Override
     public AbstractColumnTupleReaderWriterFactory getFlushColumnTupleReaderWriterFactory() {
-        return new FlushColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance);
+        return new FlushColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance, maxLeafNodeSize);
     }
 
     @Override
     public AbstractColumnTupleReaderWriterFactory createMergeColumnTupleReaderWriterFactory() {
-        return new MergeColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance);
+        return new MergeColumnTupleReaderWriterFactory(pageSize, maxTupleCount, tolerance, maxLeafNodeSize);
     }
 
     @Override
@@ -94,6 +96,7 @@
         json.put("pageSize", pageSize);
         json.put("maxTupleCount", maxTupleCount);
         json.put("tolerance", tolerance);
+        json.put("maxLeafNodeSize", maxLeafNodeSize);
 
         ArrayNode primaryKeysArray = json.putArray("primaryKeys");
         for (List<String> primaryKey : primaryKeys) {
@@ -121,7 +124,8 @@
 
         int pageSize = json.get("pageSize").asInt();
         int maxTupleCount = json.get("maxTupleCount").asInt();
-        float tolerance = (float) json.get("tolerance").asDouble();
+        double tolerance = json.get("tolerance").asDouble();
+        int maxLeafNodeSize = json.get("maxLeafNodeSize").asInt();
 
         List<List<String>> primaryKeys = new ArrayList<>();
         ArrayNode primaryKeysNode = (ArrayNode) json.get("primaryKeys");
@@ -141,7 +145,7 @@
         }
 
         return new ColumnManagerFactory(datasetType, metaType, primaryKeys, keySourceIndicator, pageSize, maxTupleCount,
-                tolerance);
+                tolerance, maxLeafNodeSize);
     }
 
     private static boolean containsSplitKeys(List<Integer> keySourceIndicator) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
index d1e439d..d40e00c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -43,11 +43,26 @@
     private final ObjectSchemaNode root;
     private AbstractSchemaNestedNode currentParent;
     private int primaryKeysLength;
+    /**
+     * Hack-alert! This tracks the total length of all strings (as they're not as encodable as numerics)
+     * The total length can be used by {@link FlushColumnTupleWriter} to stop writing tuples to the current mega
+     * leaf node to avoid having a single column that spans to megabytes of pages.
+     */
+    private int stringLengths;
 
     public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root) {
         this.columnMetadata = columnMetadata;
         this.root = root;
         nonTaggedValue = new VoidPointable();
+        stringLengths = 0;
+    }
+
+    public int getStringLengths() {
+        return stringLengths;
+    }
+
+    public void resetStringLengths() {
+        stringLengths = 0;
     }
 
     /**
@@ -153,6 +168,8 @@
         }
         if (node.isPrimaryKey()) {
             primaryKeysLength += writer.getEstimatedSize();
+        } else if (node.getTypeTag() == ATypeTag.STRING) {
+            stringLengths += pointable.getLength();
         }
         columnMetadata.exitNode(arg);
         return null;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
index 7107333..ae3559d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -28,8 +28,9 @@
 public class FlushColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
     private static final long serialVersionUID = -9197679192729634493L;
 
-    public FlushColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance) {
-        super(pageSize, maxNumberOfTuples, tolerance);
+    public FlushColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance,
+            int maxLeafNodeSize) {
+        super(pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
     }
 
     @Override
@@ -37,9 +38,11 @@
         FlushColumnMetadata flushColumnMetadata = (FlushColumnMetadata) columnMetadata;
         if (flushColumnMetadata.getMetaType() == null) {
             //no meta
-            return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+            return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance,
+                    maxLeafNodeSize);
         }
-        return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+        return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance,
+                maxLeafNodeSize);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
index 9b2b7b8..b51b395 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -28,8 +28,8 @@
     private final RecordLazyVisitablePointable metaPointable;
 
     public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance) {
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+            double tolerance, int maxLeafNodeSize) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
         metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot());
         metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index c5c1753..41cad49 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -40,16 +40,18 @@
     private final RecordLazyVisitablePointable pointable;
     private final int maxNumberOfTuples;
     private final IColumnValuesWriter[] primaryKeyWriters;
+    private final int maxLeafNodeSize;
 
     protected int primaryKeysEstimatedSize;
 
     public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance) {
+            double tolerance, int maxLeafNodeSize) {
         this.columnMetadata = columnMetadata;
         transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
         finalizer = new BatchFinalizerVisitor(columnMetadata);
         writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
         this.maxNumberOfTuples = maxNumberOfTuples;
+        this.maxLeafNodeSize = maxLeafNodeSize;
         pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
 
         int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
@@ -87,8 +89,16 @@
         return primaryKeysEstimatedSize + filterSize;
     }
 
+    /**
+     * TODO a better approach should be adopted
+     *
+     * @return the configured max number of tuples or zero if strings exceeded the maximum size
+     */
     @Override
     public final int getMaxNumberOfTuples() {
+        if (transformer.getStringLengths() >= maxLeafNodeSize) {
+            return 0;
+        }
         return maxNumberOfTuples;
     }
 
@@ -113,6 +123,7 @@
     @Override
     public final int flush(ByteBuffer pageZero) throws HyracksDataException {
         writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), columnMetadata.getNumberOfPrimaryKeys());
+        transformer.resetStringLengths();
         return finalizer.finalizeBatch(writer, columnMetadata);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index 5ac41fa..dec2ec3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -26,12 +26,14 @@
 public class LoadColumnTupleReaderWriterFactory extends FlushColumnTupleReaderWriterFactory {
     private static final long serialVersionUID = -7583574057314353873L;
 
-    public LoadColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance) {
-        super(pageSize, maxNumberOfTuples, tolerance);
+    public LoadColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance,
+            int maxLeafNodeSize) {
+        super(pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
     }
 
     @Override
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
-        return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+        return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance,
+                maxLeafNodeSize);
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index 6b840df..e47b210 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -25,8 +25,8 @@
 
 public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
     public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance) {
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+            double tolerance, int maxLeafNodeSize) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
index 3d89933..ae1c8d2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -28,14 +28,15 @@
 public class MergeColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
     private static final long serialVersionUID = -2131401304338796428L;
 
-    public MergeColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance) {
-        super(pageSize, maxNumberOfTuples, tolerance);
+    public MergeColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance,
+            int maxMegaLeafNodeSize) {
+        super(pageSize, maxNumberOfTuples, tolerance, maxMegaLeafNodeSize);
     }
 
     @Override
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
         MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata;
-        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance);
+        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 0df4aca..d3c102a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -40,6 +40,7 @@
 
 public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
     private final MergeColumnWriteMetadata columnMetadata;
+    private final int maxLeafNodeSize;
     private final MergeColumnTupleReference[] componentsTuples;
     private final RunLengthIntArray writtenComponents;
 
@@ -51,20 +52,24 @@
     private int numberOfAntiMatter;
 
     public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance) {
+            double tolerance, int maxLeafNodeSize) {
         this.columnMetadata = columnMetadata;
+        this.maxLeafNodeSize = maxLeafNodeSize;
         List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
         this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
+        int totalLength = 0;
+        int totalNumberOfTuples = 0;
         for (int i = 0; i < componentsTuplesList.size(); i++) {
             MergeColumnTupleReference mergeTuple = (MergeColumnTupleReference) componentsTuplesList.get(i);
             this.componentsTuples[i] = mergeTuple;
             mergeTuple.registerEndOfPageCallBack(this::writeAllColumns);
+            totalNumberOfTuples += mergeTuple.getTupleCount();
+            totalLength += mergeTuple.getMergingLength();
         }
+        this.maxNumberOfTuples = getMaxNumberOfTuples(maxNumberOfTuples, totalNumberOfTuples, totalLength);
         this.writtenComponents = new RunLengthIntArray();
-        this.maxNumberOfTuples = maxNumberOfTuples;
         writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
         writtenComponents.reset();
-
         primaryKeyWriters = new IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
         for (int i = 0; i < primaryKeyWriters.length; i++) {
             primaryKeyWriters[i] = columnMetadata.getWriter(i);
@@ -108,7 +113,6 @@
     @Override
     public void writeTuple(ITupleReference tuple) throws HyracksDataException {
         MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) tuple;
-        // +1 to avoid having -0, where the '-' is an antimatter indicator
         int componentIndex = columnTuple.getComponentIndex();
         int skipCount = columnTuple.getAndResetSkipCount();
         if (skipCount > 0) {
@@ -226,4 +230,13 @@
     private static int clearAntimatterIndicator(int componentIndex) {
         return -componentIndex - 1;
     }
+
+    private int getMaxNumberOfTuples(int maxNumberOfTuples, int totalNumberOfTuples, int totalLength) {
+        int numberOfTuplesUsingMaxSize = Integer.MAX_VALUE;
+        if (totalLength > maxLeafNodeSize && totalNumberOfTuples > 0) {
+            int bytesPerTuple = (int) Math.ceil(totalLength / (double) totalNumberOfTuples);
+            numberOfTuplesUsingMaxSize = maxLeafNodeSize / bytesPerTuple;
+        }
+        return Math.min(maxNumberOfTuples, numberOfTuplesUsingMaxSize);
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index 7657009..33126e8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -38,6 +38,7 @@
     private final IColumnValuesReader[] columnReaders;
     private int skipCount;
     private IEndOfPageCallBack endOfPageCallBack;
+    private int mergingLength;
 
     public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
             MergeColumnReadMetadata columnMetadata, IColumnReadMultiPageOp multiPageOp) {
@@ -64,6 +65,7 @@
         pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
         // skip count is always start from zero as no "search" is conducted during a merge
         this.skipCount = 0;
+        mergingLength = 0;
         return true;
     }
 
@@ -79,6 +81,7 @@
         columnStream.reset(buffersProvider);
         IColumnValuesReader reader = columnReaders[ordinal];
         reader.reset(columnStream, numberOfTuples);
+        mergingLength += buffersProvider.getLength();
     }
 
     @Override
@@ -116,6 +119,10 @@
         this.endOfPageCallBack = endOfPageCallBack;
     }
 
+    public int getMergingLength() {
+        return mergingLength;
+    }
+
     private static IEndOfPageCallBack createNoOpCallBack() {
         return columnTuple -> {
             if (!columnTuple.isEmpty()) {
@@ -124,5 +131,4 @@
             }
         };
     }
-
 }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 36ebab9..6b52eb7 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -83,6 +84,10 @@
      * Cap the maximum number of tuples stored per AMAX page
      */
     public static final int MAX_NUMBER_OF_TUPLES = 100;
+    /**
+     * Max size of the mega leaf node
+     */
+    public static final int MAX_LEAF_NODE_SIZE = StorageUtil.getIntSizeInBytes(512, StorageUtil.StorageUnit.KILOBYTE);
 
     /* ***************************************
      * Test static instances
@@ -145,8 +150,8 @@
     protected List<DummyPage> transform(int fileId, FlushColumnMetadata columnMetadata, List<IValueReference> records,
             int numberOfTuplesToWrite) throws IOException {
         IColumnWriteMultiPageOp multiPageOp = columnMetadata.getMultiPageOpRef().getValue();
-        FlushColumnTupleWriter writer =
-                new FlushColumnTupleWriter(columnMetadata, PAGE_SIZE, MAX_NUMBER_OF_TUPLES, TOLERANCE);
+        FlushColumnTupleWriter writer = new FlushColumnTupleWriter(columnMetadata, PAGE_SIZE, MAX_NUMBER_OF_TUPLES,
+                TOLERANCE, MAX_LEAF_NODE_SIZE);
 
         try {
             return writeTuples(fileId, writer, records, numberOfTuplesToWrite, multiPageOp);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 2e22a30..a1ca571 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -66,6 +66,7 @@
         STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) TimeUnit.MINUTES.toSeconds(10)),
         STORAGE_COLUMN_MAX_TUPLE_COUNT(NONNEGATIVE_INTEGER, 15000),
         STORAGE_COLUMN_FREE_SPACE_TOLERANCE(DOUBLE, 0.15d),
+        STORAGE_COLUMN_MAX_LEAF_NODE_SIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(10, MEGABYTE)),
         STORAGE_FORMAT(STRING, "row"),
         STORAGE_PARTITIONING(STRING, "dynamic"),
         STORAGE_PARTITIONS_COUNT(INTEGER, 8);
@@ -91,6 +92,7 @@
                 case STORAGE_FORMAT:
                 case STORAGE_COLUMN_MAX_TUPLE_COUNT:
                 case STORAGE_COLUMN_FREE_SPACE_TOLERANCE:
+                case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -147,6 +149,8 @@
                 case STORAGE_COLUMN_FREE_SPACE_TOLERANCE:
                     return "The percentage of the maximum tolerable empty space for a physical mega leaf page (e.g.,"
                             + " 0.15 means a physical page with 15% or less empty space is tolerable)";
+                case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE:
+                    return "The maximum mega leaf node to write during flush and merge operations (default: 10MB)";
                 case STORAGE_FORMAT:
                     return "The default storage format (either row or column)";
                 case STORAGE_PARTITIONING:
@@ -300,6 +304,10 @@
         return accessor.getDouble(Option.STORAGE_COLUMN_FREE_SPACE_TOLERANCE);
     }
 
+    public int getColumnMaxLeafNodeSize() {
+        return accessor.getInt(Option.STORAGE_COLUMN_MAX_LEAF_NODE_SIZE);
+    }
+
     public String getStorageFormat() {
         return accessor.getString(Option.STORAGE_FORMAT);
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index e4afd6b..a8d3113 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.object.base.AdmObjectNode;
 import org.apache.asterix.object.base.IAdmNode;
 import org.apache.asterix.runtime.compression.CompressionManager;
+import org.apache.hyracks.util.StorageUtil;
 
 public class DatasetDecl extends AbstractStatement {
 
@@ -153,7 +154,7 @@
     }
 
     public DatasetFormatInfo getDatasetFormatInfo(String defaultFormat, int defaultMaxTupleCount,
-            double defaultFreeSpaceTolerance) {
+            double defaultFreeSpaceTolerance, int defaultMaxLeafNodeSize) {
         if (datasetType != DatasetType.INTERNAL) {
             return DatasetFormatInfo.SYSTEM_DEFAULT;
         }
@@ -172,8 +173,12 @@
         double freeSpaceTolerance = datasetFormatNode.getOptionalDouble(
                 DatasetDeclParametersUtil.DATASET_FORMAT_FREE_SPACE_TOLERANCE_PARAMETER_NAME,
                 defaultFreeSpaceTolerance);
+        String maxLeafNodeSizeString =
+                datasetFormatNode.getOptionalString(DatasetDeclParametersUtil.DATASET_FORMAT_FREE_MAX_LEAF_NODE_SIZE);
+        int maxLeafNodeSize = maxLeafNodeSizeString == null ? defaultMaxLeafNodeSize
+                : (int) StorageUtil.getByteValue(maxLeafNodeSizeString);
 
-        return new DatasetFormatInfo(datasetFormat, maxTupleCount, freeSpaceTolerance);
+        return new DatasetFormatInfo(datasetFormat, maxTupleCount, freeSpaceTolerance, maxLeafNodeSize);
     }
 
     public Map<String, String> getHints() {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
index b884a80..7bb6f11 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
@@ -71,6 +71,7 @@
     public static final String DATASET_FORMAT_FORMAT_PARAMETER_NAME = "format";
     public static final String DATASET_FORMAT_MAX_TUPLE_COUNT_PARAMETER_NAME = "max-tuple-count";
     public static final String DATASET_FORMAT_FREE_SPACE_TOLERANCE_PARAMETER_NAME = "free-space-tolerance";
+    public static final String DATASET_FORMAT_FREE_MAX_LEAF_NODE_SIZE = "max-leaf-node-size";
 
     /* ***********************************************
      * Private members
@@ -179,10 +180,12 @@
     }
 
     private static ARecordType getDatasetFormatType() {
-        final String[] formatFieldNames = { DATASET_FORMAT_FORMAT_PARAMETER_NAME,
-                DATASET_FORMAT_MAX_TUPLE_COUNT_PARAMETER_NAME, DATASET_FORMAT_FREE_SPACE_TOLERANCE_PARAMETER_NAME };
+        final String[] formatFieldNames =
+                { DATASET_FORMAT_FORMAT_PARAMETER_NAME, DATASET_FORMAT_MAX_TUPLE_COUNT_PARAMETER_NAME,
+                        DATASET_FORMAT_FREE_SPACE_TOLERANCE_PARAMETER_NAME, DATASET_FORMAT_FREE_MAX_LEAF_NODE_SIZE };
         final IAType[] formatFieldTypes = { BuiltinType.ASTRING, AUnionType.createUnknownableType(BuiltinType.AINT64),
-                AUnionType.createUnknownableType(BuiltinType.ADOUBLE) };
+                AUnionType.createUnknownableType(BuiltinType.ADOUBLE),
+                AUnionType.createUnknownableType(BuiltinType.ASTRING) };
         return new ARecordType(DATASET_FORMAT_PARAMETER_NAME, formatFieldNames, formatFieldTypes, false);
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index cecf3d3..c35c03a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -188,6 +188,7 @@
     public static final String DATASET_ARECORD_DATASET_FORMAT_FORMAT_FIELD_NAME = "Format";
     public static final String DATASET_ARECORD_DATASET_MAX_TUPLE_COUNT_FIELD_NAME = "MaxTupleCount";
     public static final String DATASET_ARECORD_DATASET_FREE_SPACE_TOLERANCE_FIELD_NAME = "FreeSpaceTolerance";
+    public static final String DATASET_ARECORD_DATASET_MAX_LEAF_NODE_SIZE_FIELD_NAME = "MaxLeafNodeSize";
 
     //------------------------------------------ Field ------------------------------------------//
     public static final int FIELD_ARECORD_FIELDNAME_FIELD_INDEX = 0;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/DatasetFormatInfo.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/DatasetFormatInfo.java
index 23b90cd..4e1deb9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/DatasetFormatInfo.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/DatasetFormatInfo.java
@@ -32,15 +32,17 @@
     private final DatasetFormat format;
     private final int maxTupleCount;
     private final double freeSpaceTolerance;
+    private final int maxLeafNodeSize;
 
     private DatasetFormatInfo() {
-        this(DatasetFormat.ROW, -1, 0.0d);
+        this(DatasetFormat.ROW, -1, 0.0d, 0);
     }
 
-    public DatasetFormatInfo(DatasetFormat format, int maxTupleCount, double freeSpaceTolerance) {
+    public DatasetFormatInfo(DatasetFormat format, int maxTupleCount, double freeSpaceTolerance, int maxLeafNodeSize) {
         this.format = format;
         this.maxTupleCount = maxTupleCount;
         this.freeSpaceTolerance = freeSpaceTolerance;
+        this.maxLeafNodeSize = maxLeafNodeSize;
     }
 
     public DatasetFormat getFormat() {
@@ -55,6 +57,10 @@
         return freeSpaceTolerance;
     }
 
+    public int getMaxLeafNodeSize() {
+        return maxLeafNodeSize;
+    }
+
     @Override
     public String toString() {
         return "(format:" + format + ", maxTupleCount:" + maxTupleCount + ')';
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index ab4b585..e3c15db 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -121,7 +121,8 @@
                     IColumnManagerFactory columnManagerFactory =
                             new ColumnManagerFactory(recordType, metaType, dataset.getPrimaryKeys(), keySourceIndicator,
                                     mdProvider.getStorageProperties().getBufferCachePageSize(),
-                                    datasetFormatInfo.getMaxTupleCount(), datasetFormatInfo.getFreeSpaceTolerance());
+                                    datasetFormatInfo.getMaxTupleCount(), datasetFormatInfo.getFreeSpaceTolerance(),
+                                    datasetFormatInfo.getMaxLeafNodeSize());
                     return new LSMColumnBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories,
                             filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory,
                             pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 83c0d5b..7613dd3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -500,8 +500,14 @@
         ADouble freeSpaceToleranceDouble = (ADouble) datasetFormatRecord.getValueByPos(freeSpaceToleranceIndex);
         double freeSpaceTolerance = freeSpaceToleranceDouble.getDoubleValue();
 
+        // MaxTupleCount
+        int maxLeafNodeSizeIndex = datasetFormatType
+                .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_DATASET_MAX_LEAF_NODE_SIZE_FIELD_NAME);
+        AInt64 maxLeafNodeSizeInt = (AInt64) datasetFormatRecord.getValueByPos(maxLeafNodeSizeIndex);
+        int maxLeafNodeSize = (int) maxLeafNodeSizeInt.getLongValue();
+
         // Columnar
-        return new DatasetFormatInfo(format, maxTupleCount, freeSpaceTolerance);
+        return new DatasetFormatInfo(format, maxTupleCount, freeSpaceTolerance, maxLeafNodeSize);
     }
 
     @Override
@@ -772,6 +778,7 @@
 
         // Columnar settings
         if (info.getFormat() == DatasetConfig.DatasetFormat.COLUMN) {
+            // Max tuple count
             fieldName.reset();
             aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_MAX_TUPLE_COUNT_FIELD_NAME);
             stringSerde.serialize(aString, fieldName.getDataOutput());
@@ -780,6 +787,7 @@
             int64Serde.serialize(aInt64, fieldValue.getDataOutput());
             datasetFormatObject.addField(fieldName, fieldValue);
 
+            // free space tolerance
             fieldName.reset();
             aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_FREE_SPACE_TOLERANCE_FIELD_NAME);
             stringSerde.serialize(aString, fieldName.getDataOutput());
@@ -787,6 +795,16 @@
             aDouble.setValue(info.getFreeSpaceTolerance());
             doubleSerde.serialize(aDouble, fieldValue.getDataOutput());
             datasetFormatObject.addField(fieldName, fieldValue);
+
+            // max leaf node size
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_MAX_LEAF_NODE_SIZE_FIELD_NAME);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aInt64.setValue(info.getMaxLeafNodeSize());
+            int64Serde.serialize(aInt64, fieldValue.getDataOutput());
+            datasetFormatObject.addField(fieldName, fieldValue);
+
         }
 
         fieldName.reset();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
index 79d902c..6ba04ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
@@ -31,6 +31,7 @@
     protected final int pageSize;
     protected final int maxNumberOfTuples;
     protected final double tolerance;
+    protected final int maxLeafNodeSize;
 
     /**
      * Tuple reader/writer factory
@@ -38,11 +39,14 @@
      * @param pageSize          {@link IBufferCache} page size
      * @param maxNumberOfTuples maximum number of tuples stored per a mega leaf page
      * @param tolerance         percentage of tolerated empty space
+     * @param maxLeafNodeSize   the maximum size a mega leaf node can occupy
      */
-    protected AbstractColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance) {
+    protected AbstractColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, double tolerance,
+            int maxLeafNodeSize) {
         this.pageSize = pageSize;
         this.maxNumberOfTuples = maxNumberOfTuples;
         this.tolerance = tolerance;
+        this.maxLeafNodeSize = maxLeafNodeSize;
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 3e72584..51e8c09 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -56,6 +56,7 @@
     private int numberOfPagesInCurrentLeafNode;
     private int maxNumberOfPagesForAColumn;
     private int maxNumberOfPagesInALeafNode;
+    private int maxTupleCount;
 
     public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
             ITreeIndexFrame leafFrame) throws HyracksDataException {
@@ -74,6 +75,7 @@
         maxNumberOfPagesForAColumn = 0;
         maxNumberOfPagesInALeafNode = 0;
         numberOfLeafNodes = 1;
+        maxTupleCount = 0;
     }
 
     @Override
@@ -140,7 +142,7 @@
         //Where Page0 and columns pages will be written
         super.end();
 
-        log("Finished");
+        log("Finished", numberOfTempConfiscatedPages);
     }
 
     @Override
@@ -172,16 +174,18 @@
             write(c);
         }
 
+        // For logging
+        maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode);
+        maxTupleCount = Math.max(maxTupleCount, tupleCount);
+        // Starts with 1 for page0
+        numberOfPagesInCurrentLeafNode = 1;
+        numberOfLeafNodes++;
+
+        // Clear for next page
         pagesToWrite.clear();
         splitKey.setRightPage(leafFrontier.pageId);
         setLowKey = true;
         tupleCount = 0;
-
-        // For logging
-        maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode);
-        // Starts with 1 for page0
-        numberOfPagesInCurrentLeafNode = 1;
-        numberOfLeafNodes++;
     }
 
     @Override
@@ -219,7 +223,7 @@
         super.abort();
 
         // For logging
-        log("Aborted");
+        log("Aborted", tempConfiscatedPages.size());
     }
 
     private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -228,16 +232,15 @@
         tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(), 0);
     }
 
-    private void log(String status) {
+    private void log(String status, int numberOfTempConfiscatedPages) {
         if (!LOGGER.isDebugEnabled()) {
             return;
         }
 
-        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
         LOGGER.debug(
-                "{} columnar bulkloader used leafNodes: {}, tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
-                status, numberOfLeafNodes, numberOfTempConfiscatedPages, maxNumberOfPagesForAColumn,
-                maxNumberOfPagesInALeafNode);
+                "{} columnar bulkloader wrote maximum {} and last {} and used leafNodes: {}, tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
+                status, maxTupleCount, tupleCount, numberOfLeafNodes, numberOfTempConfiscatedPages,
+                maxNumberOfPagesForAColumn, maxNumberOfPagesInALeafNode);
     }
 
     /*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index 3923025..99dabfc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -209,7 +209,7 @@
 
     protected abstract void onNext() throws HyracksDataException;
 
-    protected final int getTupleCount() {
+    public final int getTupleCount() {
         return frame.getTupleCount();
     }
 
