[ASTERIXDB-3617][STO] Supporting sparse page zero writer

- user model changes: no
- storage format changes: yes
- interface changes: yes

Details:
Introducing sparse pageZero writer for
fitting columns offset + filter more efficiently
in case the batch of documents contains only
a subset of columns.

Ext-ref: MB-66306
Change-Id: Ide78a7c33dee69c24963921ee891f89c724cb262
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19986
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/flush/ASTERIXDB-3597/ASTERIXDB-3597.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/flush/ASTERIXDB-3597/ASTERIXDB-3597.003.query.sqlpp
index ff64a7e..8a7a3e9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/flush/ASTERIXDB-3597/ASTERIXDB-3597.003.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/flush/ASTERIXDB-3597/ASTERIXDB-3597.003.query.sqlpp
@@ -21,4 +21,5 @@
 
 SELECT *
 FROM ds
-WHERE id >= "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
\ No newline at end of file
+WHERE id >= "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
+ORDER BY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/merge/ASTERIXDB-3597/ASTERIXDB-3597.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/merge/ASTERIXDB-3597/ASTERIXDB-3597.004.query.sqlpp
index ff64a7e..8a7a3e9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/merge/ASTERIXDB-3597/ASTERIXDB-3597.004.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/io/merge/ASTERIXDB-3597/ASTERIXDB-3597.004.query.sqlpp
@@ -21,4 +21,5 @@
 
 SELECT *
 FROM ds
-WHERE id >= "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
\ No newline at end of file
+WHERE id >= "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
+ORDER BY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
index 8817ae6..5ac4f24 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
@@ -35,6 +35,10 @@
         return buffer.position() - startOffset;
     }
 
+    public int getPosition() {
+        return buffer.position();
+    }
+
     @Override
     public void write(int b) throws IOException {
         buffer.put((byte) b);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
index db1862c..e14fce3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.filter;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,11 +41,12 @@
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesReaderFactory;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
 
 public class FilterAccessorProvider {
     private final ObjectSchemaNode root;
@@ -138,16 +138,17 @@
         return filterColumnReaders;
     }
 
-    public static void setFilterValues(List<IColumnRangeFilterValueAccessor> filterValueAccessors, ByteBuffer pageZero,
-            int numberOfColumns) {
+    public static void setFilterValues(List<IColumnRangeFilterValueAccessor> filterValueAccessors,
+            ColumnBTreeReadLeafFrame frame) {
+        IColumnPageZeroReader pageZeroReader = frame.getColumnPageZeroReader();
         for (int i = 0; i < filterValueAccessors.size(); i++) {
             ColumnRangeFilterValueAccessor accessor = (ColumnRangeFilterValueAccessor) filterValueAccessors.get(i);
             int columnIndex = accessor.getColumnIndex();
             long normalizedValue;
-            if (columnIndex < numberOfColumns) {
-                int filterOffset = pageZero.position() + columnIndex * AbstractColumnFilterWriter.FILTER_SIZE;
-                normalizedValue =
-                        accessor.isMin() ? pageZero.getLong(filterOffset) : pageZero.getLong(filterOffset + Long.BYTES);
+            if (pageZeroReader.isValidColumn(columnIndex)) {
+                int filterOffset = pageZeroReader.getColumnFilterOffset(columnIndex);
+                normalizedValue = accessor.isMin() ? pageZeroReader.getLong(filterOffset)
+                        : pageZeroReader.getLong(filterOffset + Long.BYTES);
             } else {
                 // Column is missing
                 normalizedValue = accessor.isMin() ? Long.MAX_VALUE : Long.MIN_VALUE;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
index 187e460..31dcaae 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
@@ -19,6 +19,17 @@
 package org.apache.asterix.column.metadata.schema;
 
 public abstract class AbstractSchemaNestedNode extends AbstractSchemaNode {
+    //A nestedNode can initially be empty, which contributes
+    //to a missing column.
+    protected boolean missingInitiallyInBatch;
+
+    public void setMissingInitiallyInBatch(boolean missingInitiallyInBatch) {
+        this.missingInitiallyInBatch = missingInitiallyInBatch;
+    }
+
+    public boolean isMissingInitiallyInBatch() {
+        return missingInitiallyInBatch;
+    }
 
     @Override
     public final boolean isNested() {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
index 622705c..4cfa731 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
@@ -35,6 +35,16 @@
 
 public abstract class AbstractSchemaNode {
     private int counter;
+    //Indicates if all the columns of the children is to be included
+    private boolean needAllColumns;
+
+    //Needed for estimating the column count
+    protected int previousNumberOfColumns; // before transform
+    protected int numberOfColumns;
+    protected int numberOfVisitedColumnsInBatch;
+    private int newDiscoveredColumns;
+    private int visitedBatchVersion;
+    private int formerChildNullVersion;
 
     public abstract ATypeTag getTypeTag();
 
@@ -48,6 +58,14 @@
         counter++;
     }
 
+    public void needAllColumns(boolean needAllColumns) {
+        this.needAllColumns = needAllColumns;
+    }
+
+    public boolean needAllColumns() {
+        return needAllColumns;
+    }
+
     public final void setCounter(int counter) {
         this.counter = counter;
     }
@@ -90,4 +108,54 @@
                 throw new UnsupportedEncodingException(typeTag + " is not supported");
         }
     }
+
+    // Needed for estimating the number of columns.
+    public void setNumberOfVisitedColumnsInBatch(int numberOfVisitedColumnsInBatch) {
+        this.numberOfVisitedColumnsInBatch = numberOfVisitedColumnsInBatch;
+    }
+
+    public int getNumberOfVisitedColumnsInBatch() {
+        return numberOfVisitedColumnsInBatch;
+    }
+
+    public void setNewDiscoveredColumns(int newDiscoveredColumns) {
+        this.newDiscoveredColumns = newDiscoveredColumns;
+    }
+
+    public int getNewDiscoveredColumns() {
+        return newDiscoveredColumns;
+    }
+
+    public int getNumberOfColumns() {
+        return numberOfColumns;
+    }
+
+    public void incrementColumns(int deltaColumns) {
+        this.numberOfColumns += deltaColumns;
+    }
+
+    public int getDeltaColumnsChanged() {
+        if (previousNumberOfColumns != numberOfColumns) {
+            int diff = numberOfColumns - previousNumberOfColumns;
+            previousNumberOfColumns = numberOfColumns;
+            return diff;
+        }
+        return 0;
+    }
+
+    public void setFormerChildNull(int formerChildNullVersion) {
+        this.formerChildNullVersion = formerChildNullVersion;
+    }
+
+    public int formerChildNullVersion() {
+        return formerChildNullVersion;
+    }
+
+    public int getVisitedBatchVersion() {
+        return visitedBatchVersion;
+    }
+
+    public void setVisitedBatchVersion(int visitedBatchVersion) {
+        this.visitedBatchVersion = visitedBatchVersion;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
index 4cf2d23..497654c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -49,6 +49,7 @@
     private final Int2IntMap fieldNameIndexToChildIndexMap;
     private final List<AbstractSchemaNode> children;
     private IntUnaryOperator nextIndex;
+    private boolean isEmptyObject = false;
 
     public ObjectSchemaNode() {
         fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
@@ -72,7 +73,8 @@
         }
 
         children = new ArrayList<>();
-        deserializeChildren(input, children, numberOfChildren, definitionLevels);
+        numberOfColumns = deserializeChildren(input, children, numberOfChildren, definitionLevels);
+        previousNumberOfColumns = numberOfColumns;
     }
 
     public AbstractSchemaNode getOrCreateChild(IValueReference fieldName, ATypeTag childTypeTag,
@@ -86,7 +88,13 @@
             children.add(childIndex, newChild);
             fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
         } else if (currentChild != newChild) {
+            numberOfColumns -= currentChild.getNumberOfColumns();
             children.set(childIndex, newChild);
+            newChild.getDeltaColumnsChanged();
+            numberOfColumns += newChild.getNumberOfColumns();
+        } else {
+            // If the child is the same, we just update the number of columns
+            numberOfColumns += currentChild.getDeltaColumnsChanged();
         }
 
         return newChild;
@@ -98,13 +106,15 @@
         children.add(child);
     }
 
-    public void setEmptyObject(FlushColumnMetadata columnMetadata) throws HyracksDataException {
+    public AbstractSchemaNode setEmptyObject(FlushColumnMetadata columnMetadata) throws HyracksDataException {
         if (!children.isEmpty()) {
-            return;
+            return null;
         }
+        isEmptyObject = true;
         AbstractSchemaNode emptyChild = columnMetadata.getOrCreateChild(null, ATypeTag.MISSING);
         addChild(DUMMY_FIELD_NAME_INDEX, emptyChild);
         nextIndex = this::emptyColumnIndex;
+        return emptyChild;
     }
 
     public AbstractSchemaNode getChild(int fieldNameIndex) {
@@ -187,11 +197,15 @@
         }
     }
 
-    private static void deserializeChildren(DataInput input, List<AbstractSchemaNode> children, int numberOfChildren,
+    private static int deserializeChildren(DataInput input, List<AbstractSchemaNode> children, int numberOfChildren,
             Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException {
+        int numberOfColumns = 0;
         for (int i = 0; i < numberOfChildren; i++) {
-            children.add(AbstractSchemaNode.deserialize(input, definitionLevels));
+            AbstractSchemaNode child = AbstractSchemaNode.deserialize(input, definitionLevels);
+            numberOfColumns += child.getNumberOfColumns();
+            children.add(child);
         }
+        return numberOfColumns;
     }
 
     private int nextIndex(int fieldNameIndex) {
@@ -202,6 +216,11 @@
         nextIndex = this::nextIndex;
         fieldNameIndexToChildIndexMap.remove(DUMMY_FIELD_NAME_INDEX);
         fieldNameIndexToChildIndexMap.put(fieldNameIndex, 0);
+        isEmptyObject = false;
         return 0;
     }
+
+    public boolean isEmptyObject() {
+        return isEmptyObject;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
index 2503143..8124780 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -42,6 +42,11 @@
     public UnionSchemaNode(AbstractSchemaNode child1, AbstractSchemaNode child2) {
         children = new EnumMap<>(ATypeTag.class);
         originalType = child1;
+        // this is a new node, update the number of columns
+        numberOfColumns = child1.getNumberOfColumns();
+        previousNumberOfColumns = numberOfColumns; // this is an older node
+        child2.getDeltaColumnsChanged();
+        numberOfColumns += child2.getNumberOfColumns();
         putChild(child1);
         putChild(child2);
     }
@@ -54,10 +59,14 @@
         ATypeTag originalTypeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
         int numberOfChildren = input.readInt();
         children = new EnumMap<>(ATypeTag.class);
+        int columnsCount = 0;
         for (int i = 0; i < numberOfChildren; i++) {
             AbstractSchemaNode child = AbstractSchemaNode.deserialize(input, definitionLevels);
+            columnsCount += child.getNumberOfColumns();
             children.put(child.getTypeTag(), child);
         }
+        numberOfColumns = columnsCount;
+        previousNumberOfColumns = numberOfColumns;
         originalType = children.get(originalTypeTag);
     }
 
@@ -65,6 +74,15 @@
         children.put(child.getTypeTag(), child);
     }
 
+    private void putChild(AbstractSchemaNode newChild, AbstractSchemaNode currentChild) {
+        if (currentChild != null && newChild.getTypeTag() == currentChild.getTypeTag()) {
+            numberOfColumns -= currentChild.getNumberOfColumns();
+        }
+        newChild.getDeltaColumnsChanged();
+        numberOfColumns += newChild.getNumberOfColumns();
+        children.put(newChild.getTypeTag(), newChild);
+    }
+
     public AbstractSchemaNode getOriginalType() {
         return originalType;
     }
@@ -76,7 +94,9 @@
         //The parent of a union child should be the actual parent
         AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, normalizedTypeTag);
         if (currentChild != newChild) {
-            putChild(newChild);
+            putChild(newChild, currentChild);
+        } else {
+            numberOfColumns += newChild.getDeltaColumnsChanged();
         }
         return newChild;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
index 8455864..1bcf1e9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
@@ -45,13 +45,24 @@
             definitionLevels.put(this, new RunLengthIntArray());
         }
         item = AbstractSchemaNode.deserialize(input, definitionLevels);
+        numberOfColumns = item.getNumberOfColumns();
+        previousNumberOfColumns = numberOfColumns;
     }
 
     public final AbstractSchemaNode getOrCreateItem(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
             throws HyracksDataException {
         AbstractSchemaNode newItem = columnMetadata.getOrCreateChild(item, childTypeTag);
-        if (newItem != item) {
+        if (item == null) {
+            newItem.getDeltaColumnsChanged();
+            numberOfColumns += newItem.getNumberOfColumns();
             item = newItem;
+        } else if (newItem != item) {
+            numberOfColumns -= item.getNumberOfColumns();
+            item = newItem;
+            newItem.getDeltaColumnsChanged();
+            numberOfColumns += newItem.getNumberOfColumns();
+        } else {
+            numberOfColumns += item.getDeltaColumnsChanged();
         }
         return item;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
index 28d379d..af3d84b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
@@ -37,12 +37,16 @@
         this.columnIndex = columnIndex;
         this.typeTag = typeTag;
         this.primaryKey = primaryKey;
+        this.previousNumberOfColumns = 0;
+        this.numberOfColumns = 1;
     }
 
     public PrimitiveSchemaNode(ATypeTag typeTag, DataInput input) throws IOException {
         this.typeTag = typeTag;
         columnIndex = input.readInt();
         primaryKey = input.readBoolean();
+        this.previousNumberOfColumns = 1;
+        this.numberOfColumns = 1;
     }
 
     public final int getColumnIndex() {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
index c7d3df1..cbe4101 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -76,6 +76,7 @@
         try {
             AbstractSchemaNode itemNode = collectionNode.getOrCreateItem(itemType.getTypeTag(), columnMetadata);
             itemType.accept(this, itemNode);
+            collectionNode.incrementColumns(itemNode.getDeltaColumnsChanged());
         } catch (HyracksDataException e) {
             throw new IllegalStateException(e);
         }
@@ -154,5 +155,6 @@
         AbstractSchemaNode child = objectNode.getOrCreateChild(fieldName, fieldType.getTypeTag(), columnMetadata);
 
         fieldType.accept(this, child);
+        objectNode.incrementColumns(child.getDeltaColumnsChanged());
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
index e3407a1..bc8f184 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.column.operation.lsm.flush;
 
+import java.util.BitSet;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -32,12 +33,16 @@
 import org.apache.asterix.column.values.IColumnBatchWriter;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 
 public final class BatchFinalizerVisitor implements ISchemaNodeVisitor<Void, AbstractSchemaNestedNode> {
     private final FlushColumnMetadata columnSchemaMetadata;
     private final IColumnValuesWriter[] primaryKeyWriters;
     private final PriorityQueue<IColumnValuesWriter> orderedColumns;
+    private BitSet presentColumnsIndex;
+    private IColumnPageZeroWriter columnPageZeroWriter;
     private int level;
+    private boolean needAllColumns;
 
     public BatchFinalizerVisitor(FlushColumnMetadata columnSchemaMetadata) {
         this.columnSchemaMetadata = columnSchemaMetadata;
@@ -50,15 +55,21 @@
         level = -1;
     }
 
-    public int finalizeBatch(IColumnBatchWriter batchWriter, FlushColumnMetadata columnMetadata)
-            throws HyracksDataException {
+    public void finalizeBatchColumns(FlushColumnMetadata columnMetadata, BitSet presentColumnsIndexes,
+            IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
         orderedColumns.clear();
+        this.presentColumnsIndex = presentColumnsIndexes;
+        this.needAllColumns = false;
+        this.columnPageZeroWriter = pageZeroWriter;
 
+        // is this needed to parse the whole schema??
         columnMetadata.getRoot().accept(this, null);
         if (columnMetadata.getMetaRoot() != null) {
             columnMetadata.getMetaRoot().accept(this, null);
         }
+    }
 
+    public int finalizeBatch(IColumnBatchWriter batchWriter) throws HyracksDataException {
         batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
         return batchWriter.writeColumns(orderedColumns);
     }
@@ -66,13 +77,18 @@
     @Override
     public Void visit(ObjectSchemaNode objectNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
         level++;
+        boolean previousNeedAllColumns = needAllColumns;
+        needAllColumns = needAllColumns | objectNode.needAllColumns();
         columnSchemaMetadata.flushDefinitionLevels(level - 1, arg, objectNode);
         List<AbstractSchemaNode> children = objectNode.getChildren();
         for (int i = 0; i < children.size(); i++) {
             children.get(i).accept(this, objectNode);
         }
         objectNode.setCounter(0);
+        objectNode.setNumberOfVisitedColumnsInBatch(0);
         columnSchemaMetadata.clearDefinitionLevels(objectNode);
+        objectNode.needAllColumns(false);
+        needAllColumns = previousNeedAllColumns;
         level--;
         return null;
     }
@@ -81,34 +97,50 @@
     public Void visit(AbstractCollectionSchemaNode collectionNode, AbstractSchemaNestedNode arg)
             throws HyracksDataException {
         level++;
+        boolean previousNeedAllColumns = needAllColumns;
+        needAllColumns = needAllColumns | collectionNode.needAllColumns();
         columnSchemaMetadata.flushDefinitionLevels(level - 1, arg, collectionNode);
         collectionNode.getItemNode().accept(this, collectionNode);
         collectionNode.setCounter(0);
+        collectionNode.setNumberOfVisitedColumnsInBatch(0);
         columnSchemaMetadata.clearDefinitionLevels(collectionNode);
+        collectionNode.needAllColumns(false);
+        needAllColumns = previousNeedAllColumns;
         level--;
         return null;
     }
 
     @Override
     public Void visit(UnionSchemaNode unionNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+        boolean previousNeedAllColumns = needAllColumns;
+        needAllColumns = needAllColumns | unionNode.needAllColumns();
         columnSchemaMetadata.flushDefinitionLevels(level, arg, unionNode);
         for (AbstractSchemaNode node : unionNode.getChildren().values()) {
             node.accept(this, unionNode);
         }
         unionNode.setCounter(0);
+        unionNode.setNumberOfVisitedColumnsInBatch(0);
         columnSchemaMetadata.clearDefinitionLevels(unionNode);
+        unionNode.needAllColumns(false);
+        needAllColumns = previousNeedAllColumns;
         return null;
     }
 
     @Override
     public Void visit(PrimitiveSchemaNode primitiveNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
         columnSchemaMetadata.flushDefinitionLevels(level, arg, primitiveNode);
-        if (!primitiveNode.isPrimaryKey()) {
+        if (needAllColumns) {
+            presentColumnsIndex.set(primitiveNode.getColumnIndex());
+        }
+        // in case of DefaultWriter all non-primary columns should be included
+        if (!primitiveNode.isPrimaryKey() && columnPageZeroWriter.includeOrderedColumn(presentColumnsIndex,
+                primitiveNode.getColumnIndex(), needAllColumns)) {
             orderedColumns.add(columnSchemaMetadata.getWriter(primitiveNode.getColumnIndex()));
         }
 
         //Prepare for the next batch
         primitiveNode.setCounter(0);
+        primitiveNode.setNumberOfVisitedColumnsInBatch(0);
         return null;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
index 71b561a..7b1a85b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.flush;
 
+import java.util.BitSet;
+
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
@@ -41,6 +43,7 @@
     private final FlushColumnMetadata columnMetadata;
     private final VoidPointable nonTaggedValue;
     private final ObjectSchemaNode root;
+    private final BitSet presentColumnsIndexes;
     private AbstractSchemaNestedNode currentParent;
     private int primaryKeysLength;
     /**
@@ -49,12 +52,15 @@
      * leaf node to avoid having a single column that spans to megabytes of pages.
      */
     private int stringLengths;
+    private int currentBatchVersion;
 
-    public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root) {
+    public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root, BitSet presentColumnsIndexes) {
         this.columnMetadata = columnMetadata;
         this.root = root;
+        this.presentColumnsIndexes = presentColumnsIndexes;
         nonTaggedValue = new VoidPointable();
         stringLengths = 0;
+        currentBatchVersion = 1;
     }
 
     public int getStringLengths() {
@@ -63,6 +69,7 @@
 
     public void resetStringLengths() {
         stringLengths = 0;
+        currentBatchVersion++;
     }
 
     /**
@@ -72,6 +79,7 @@
      * @return the estimated size (possibly overestimated) of the primary key(s) columns
      */
     public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException {
+        // clear the last present column indexes.
         primaryKeysLength = 0;
         pointable.accept(this, root);
         return primaryKeysLength;
@@ -84,6 +92,8 @@
             int start = tuple.getFieldStart(i);
             ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[start]];
             nonTaggedValue.set(bytes, start + 1, tuple.getFieldLength(i) - 1);
+            // include the primary key column
+            presentColumnsIndexes.set(i);
             IColumnValuesWriter writer = columnMetadata.getWriter(i);
             writer.writeAntiMatter(tag, nonTaggedValue);
             pkSize += writer.getEstimatedSize();
@@ -99,6 +109,13 @@
 
         ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
         currentParent = objectNode;
+
+        if (currentParent.getVisitedBatchVersion() != currentBatchVersion) {
+            currentParent.needAllColumns(false);
+            currentParent.setMissingInitiallyInBatch(false);
+            currentParent.setVisitedBatchVersion(currentBatchVersion);
+        }
+
         for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
             pointable.nextChild();
             IValueReference fieldName = pointable.getFieldName();
@@ -113,6 +130,17 @@
         if (pointable.getNumberOfChildren() == 0) {
             // Set as empty object
             objectNode.setEmptyObject(columnMetadata);
+            if (!objectNode.isMissingInitiallyInBatch() && objectNode.isEmptyObject()) {
+                objectNode.needAllColumns(true);
+                objectNode.setMissingInitiallyInBatch(true);
+                PrimitiveSchemaNode missingNode = (PrimitiveSchemaNode) objectNode.getChildren().get(0);
+                presentColumnsIndexes.set(missingNode.getColumnIndex());
+            }
+        } else {
+            if (objectNode.isMissingInitiallyInBatch()) {
+                objectNode.setMissingInitiallyInBatch(false);
+                objectNode.needAllColumns(false);
+            }
         }
 
         columnMetadata.exitNode(arg);
@@ -132,19 +160,34 @@
         int missingLevel = columnMetadata.getLevel();
         currentParent = collectionNode;
 
+        if (currentParent.getVisitedBatchVersion() != currentBatchVersion) {
+            currentParent.needAllColumns(false);
+            currentParent.setMissingInitiallyInBatch(false);
+            currentParent.setVisitedBatchVersion(currentBatchVersion);
+        }
+
+        // If it's an array, all column will be needed as anyone of them, will be a delegate.
+        currentParent.needAllColumns(true);
+
         int numberOfChildren = pointable.getNumberOfChildren();
+        int newDiscoveredColumns = 0;
         for (int i = 0; i < numberOfChildren; i++) {
             pointable.nextChild();
             ATypeTag childTypeTag = pointable.getChildTypeTag();
             AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
             acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+            currentParent.incrementColumns(childNode.getDeltaColumnsChanged());
+            newDiscoveredColumns += childNode.getNewDiscoveredColumns();
+            childNode.setNewDiscoveredColumns(0);
             /*
              * The array item may change (e.g., BIGINT --> UNION). Thus, new items would be considered as missing
              */
             defLevels.add(missingLevel);
         }
 
-        // Add missing as a last element of the array to help indicate empty arrays
+        currentParent.setNewDiscoveredColumns(newDiscoveredColumns);
+        currentParent.setNumberOfVisitedColumnsInBatch(
+                currentParent.getNumberOfVisitedColumnsInBatch() + newDiscoveredColumns);
         collectionNode.getOrCreateItem(ATypeTag.MISSING, columnMetadata);
         defLevels.add(missingLevel);
 
@@ -159,6 +202,7 @@
         columnMetadata.enterNode(currentParent, arg);
         ATypeTag valueTypeTag = pointable.getTypeTag();
         PrimitiveSchemaNode node = (PrimitiveSchemaNode) arg;
+        presentColumnsIndexes.set(node.getColumnIndex());
         IColumnValuesWriter writer = columnMetadata.getWriter(node.getColumnIndex());
         if (valueTypeTag == ATypeTag.MISSING) {
             writer.writeLevel(columnMetadata.getLevel());
@@ -198,6 +242,7 @@
                  */
                 AbstractSchemaNode actualNode = unionNode.getOriginalType();
                 acceptActualNode(pointable, actualNode);
+                currentParent.incrementColumns(actualNode.getDeltaColumnsChanged());
             } else {
                 AbstractSchemaNode actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
                 pointable.accept(this, actualNode);
@@ -206,7 +251,8 @@
             currentParent = previousParent;
             columnMetadata.exitNode(node);
         } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) {
-            columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node);
+            node.needAllColumns(true);
+            columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node, true);
         } else {
             pointable.accept(this, node);
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index f987abe..b45d7d9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -65,6 +65,7 @@
 import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 
 /**
  * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
@@ -72,6 +73,7 @@
  */
 public class FlushColumnMetadata extends AbstractColumnMetadata {
     private static final Logger LOGGER = LogManager.getLogger();
+    public final IntOpenHashSet orderedColumns;
     protected final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels;
     private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
     private final IFieldNamesDictionary fieldNamesDictionary;
@@ -93,6 +95,7 @@
         super(datasetType, metaType, primaryKeys.size());
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
+        this.orderedColumns = new IntOpenHashSet();
         definitionLevels = new HashMap<>();
         columnWriters = new ArrayList<>();
         level = -1;
@@ -133,6 +136,7 @@
         super(datasetType, metaType, numPrimaryKeys);
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
+        this.orderedColumns = new IntOpenHashSet();
         this.definitionLevels = definitionLevels;
         this.columnWriters = columnWriters;
         level = -1;
@@ -346,11 +350,18 @@
             throws HyracksDataException {
         AbstractSchemaNode currentChild = child;
         ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
+        boolean newChild = currentChild == null;
         if (currentChild == null || normalizedTypeTag != ATypeTag.MISSING && normalizedTypeTag != ATypeTag.NULL
                 && currentChild.getTypeTag() != ATypeTag.UNION
                 && getNormalizedTypeTag(currentChild.getTypeTag()) != normalizedTypeTag) {
             //Create a new child or union type if required type is different from the current child type
+            int visitedBatchVersion = newChild ? -1 : currentChild.getVisitedBatchVersion();
             currentChild = createChild(child, childTypeTag);
+            //Missing will become UNION, hence only NULL and NULL will be replaced
+            if (!newChild && (child.getTypeTag() == ATypeTag.NULL)) {
+                //This will be a replaced child
+                currentChild.setFormerChildNull(visitedBatchVersion);
+            }
             //Flag that the schema has changed
             changed = true;
         }
@@ -421,25 +432,30 @@
 
     public void flushDefinitionLevels(int level, AbstractSchemaNestedNode parent, AbstractSchemaNode node)
             throws HyracksDataException {
+        flushDefinitionLevels(level, parent, node, false);
+    }
+
+    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node,
+            boolean includeChildColumns) throws HyracksDataException {
+        //Flush all definition levels from parent to the current node
+        flushDefinitionLevels(level, parent, node, includeChildColumns);
+        //Add null value (+2) to say that both the parent and the child are present
+        definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
+        node.incrementCounter();
+    }
+
+    private void flushDefinitionLevels(int level, AbstractSchemaNestedNode parent, AbstractSchemaNode node,
+            boolean includeChildColumns) throws HyracksDataException {
         if (parent != null) {
             RunLengthIntArray parentDefLevels = definitionLevels.get(parent);
             if (node.getCounter() < parentDefLevels.getSize()) {
                 int parentMask = ColumnValuesUtil.getNullMask(level);
                 int childMask = ColumnValuesUtil.getNullMask(level + 1);
-                flushDefinitionLevels(parentMask, childMask, parentDefLevels, node);
+                flushDefinitionLevels(parentMask, childMask, parentDefLevels, node, includeChildColumns);
             }
         }
     }
 
-    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
-            throws HyracksDataException {
-        //Flush all definition levels from parent to the current node
-        flushDefinitionLevels(level, parent, node);
-        //Add null value (+2) to say that both the parent and the child are present
-        definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
-        node.incrementCounter();
-    }
-
     public void close() {
         //Dereference multiPageOp
         multiPageOpRef.setValue(null);
@@ -449,7 +465,7 @@
     }
 
     protected void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels,
-            AbstractSchemaNode node) throws HyracksDataException {
+            AbstractSchemaNode node, boolean includeChildColumns) throws HyracksDataException {
         int startIndex = node.getCounter();
         if (node.isNested()) {
             RunLengthIntArray childDefLevels = definitionLevels.get((AbstractSchemaNestedNode) node);
@@ -519,7 +535,7 @@
                 nullWriterIndexes.add(columnIndex);
                 createdChild = createChild(normalizedTypeTag);
                 int mask = ColumnValuesUtil.getNullMask(level);
-                flushDefinitionLevels(mask, mask, defLevels, createdChild);
+                flushDefinitionLevels(mask, mask, defLevels, createdChild, false);
             } else {
                 //Different type. Make union
                 createdChild = addDefinitionLevelsAndGet(new UnionSchemaNode(child, createChild(normalizedTypeTag)));
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
index a1abf5e..f892643 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -31,7 +31,8 @@
     public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
             double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
         super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize, writeContext);
-        metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot());
+        metaColumnTransformer =
+                new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot(), presentColumnsIndexes);
         metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 36dd6bc..53a6375 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -20,12 +20,15 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesWriterFactory;
 import org.apache.asterix.column.values.writer.ColumnBatchWriter;
 import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -35,9 +38,14 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
     protected final FlushColumnMetadata columnMetadata;
     protected final NoWriteFlushColumnMetadata columnMetadataWithCurrentTuple;
 
@@ -50,13 +58,17 @@
     private final int maxNumberOfTuples;
     private final IColumnValuesWriter[] primaryKeyWriters;
     private final int maxLeafNodeSize;
+    protected final BitSet presentColumnsIndexes;
 
     protected int primaryKeysEstimatedSize;
+    protected final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
 
     public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
             double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
         this.columnMetadata = columnMetadata;
-        transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
+        this.pageZeroWriterFlavorSelector = new PageZeroWriterFlavorSelector();
+        this.presentColumnsIndexes = new BitSet();
+        transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot(), presentColumnsIndexes);
         finalizer = new BatchFinalizerVisitor(columnMetadata);
         writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance, writeContext);
         this.maxNumberOfTuples = maxNumberOfTuples;
@@ -79,8 +91,8 @@
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        transformerForCurrentTuple =
-                new NoWriteColumnTransformer(columnMetadataWithCurrentTuple, columnMetadataWithCurrentTuple.getRoot());
+        transformerForCurrentTuple = new NoWriteColumnTransformer(columnMetadataWithCurrentTuple,
+                columnMetadataWithCurrentTuple.getRoot(), columnMetadataWithCurrentTuple.getMetaRoot());
     }
 
     @Override
@@ -89,7 +101,12 @@
     }
 
     @Override
-    public final int getNumberOfColumns(boolean includeCurrentTupleColumns) {
+    public IColumnPageZeroWriterFlavorSelector getColumnPageZeroWriterFlavorSelector() {
+        return pageZeroWriterFlavorSelector;
+    }
+
+    @Override
+    public final int getAbsoluteNumberOfColumns(boolean includeCurrentTupleColumns) {
         if (includeCurrentTupleColumns) {
             return columnMetadataWithCurrentTuple.getNumberOfColumns();
         } else {
@@ -109,10 +126,8 @@
     }
 
     @Override
-    public final int getOccupiedSpace() {
-        int numberOfColumns = getNumberOfColumns(true);
-        int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
-        return primaryKeysEstimatedSize + filterSize;
+    public final int getPrimaryKeysEstimatedSize() {
+        return primaryKeysEstimatedSize;
     }
 
     /**
@@ -129,6 +144,21 @@
     }
 
     @Override
+    public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
+        int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+        int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+
+        // space occupied by the sparse writer
+        int spaceOccupiedBySparseWriter = presentColumns
+                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        int spaceOccupiedByDefaultWriter = totalNumberOfColumns
+                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
+                spaceOccupiedBySparseWriter);
+        return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
+    }
+
+    @Override
     public final void close() {
         columnMetadata.close();
         writer.close();
@@ -163,10 +193,37 @@
     }
 
     @Override
-    public final int flush(ByteBuffer pageZero) throws HyracksDataException {
-        writer.setPageZeroBuffer(pageZero, getNumberOfColumns(false), columnMetadata.getNumberOfPrimaryKeys());
+    public final int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+        int numberOfColumns = getAbsoluteNumberOfColumns(false);
+        finalizer.finalizeBatchColumns(columnMetadata, presentColumnsIndexes, pageZeroWriter);
+        writer.setPageZeroWriter(pageZero, pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
+
+        //assertion logging
+        int presentColumnsCount = presentColumnsIndexes.cardinality();
+        int beforeTransformColumnCount = transformerForCurrentTuple.getBeforeTransformColumnsCount();
+        int currentTupleColumnsCount = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+        if (beforeTransformColumnCount != presentColumnsCount || currentTupleColumnsCount != presentColumnsCount) {
+            LOGGER.debug("mismatch in column counts: beforeTransform={}, currentTuple={}, expected={}",
+                    beforeTransformColumnCount, currentTupleColumnsCount, presentColumnsCount);
+        }
+
+        return finalizer.finalizeBatch(writer);
+    }
+
+    @Override
+    public void reset() {
         transformer.resetStringLengths();
-        return finalizer.finalizeBatch(writer, columnMetadata);
+        transformerForCurrentTuple.reset();
+        presentColumnsIndexes.clear();
+    }
+
+    public static int[] toIndexArray(BitSet bitSet) {
+        int[] result = new int[bitSet.cardinality()];
+        int idx = 0;
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+            result[idx++] = i;
+        }
+        return result;
     }
 
     protected void writeRecord(ITupleReference tuple) throws HyracksDataException {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
index 5efab0e..89d812b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
 import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
 import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
 import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
 import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
@@ -37,11 +38,17 @@
 
     private final NoWriteFlushColumnMetadata columnMetadata;
     private final ObjectSchemaNode root;
+    private final int metaColumnCount;
+    private int currentBatchVersion;
+    private int beforeTransformColumnsCount = 0;
     private AbstractSchemaNestedNode currentParent;
 
-    public NoWriteColumnTransformer(NoWriteFlushColumnMetadata columnMetadata, ObjectSchemaNode root) {
+    public NoWriteColumnTransformer(NoWriteFlushColumnMetadata columnMetadata, ObjectSchemaNode root,
+            ObjectSchemaNode metaRoot) {
         this.columnMetadata = columnMetadata;
         this.root = root;
+        this.metaColumnCount = metaRoot != null ? metaRoot.getNumberOfColumns() : 0;
+        currentBatchVersion = 1;
     }
 
     /**
@@ -51,10 +58,19 @@
      * @return the estimated size (possibly overestimated) of the primary key(s) columns
      */
     public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException {
+        beforeTransformColumnsCount = getNumberOfVisitedColumnsInBatch();
         pointable.accept(this, root);
         return 0;
     }
 
+    public int getBeforeTransformColumnsCount() {
+        return beforeTransformColumnsCount;
+    }
+
+    public void reset() {
+        currentBatchVersion++;
+    }
+
     @Override
     public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, AbstractSchemaNode arg)
             throws HyracksDataException {
@@ -62,6 +78,14 @@
 
         ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
         currentParent = objectNode;
+        int newDiscoveredColumns = 0;
+        if (currentParent.getVisitedBatchVersion() != currentBatchVersion) {
+            objectNode.setNumberOfVisitedColumnsInBatch(0);
+            objectNode.setVisitedBatchVersion(currentBatchVersion);
+            objectNode.setMissingInitiallyInBatch(false);
+            objectNode.needAllColumns(false);
+        }
+
         for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
             pointable.nextChild();
             IValueReference fieldName = pointable.getFieldName();
@@ -69,12 +93,48 @@
             if (childTypeTag != ATypeTag.MISSING) {
                 AbstractSchemaNode childNode = objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata);
                 acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+                int childDiscoveredColumns = childNode.getNewDiscoveredColumns();
+                if (childNode.formerChildNullVersion() == currentBatchVersion) {
+                    //Missing or NULL contributed to one of the columns
+                    childNode.setFormerChildNull(-1);
+                    childDiscoveredColumns -= 1;
+                } else {
+                    childNode.setFormerChildNull(-1);
+                }
+                newDiscoveredColumns += childDiscoveredColumns;
+                childNode.setNewDiscoveredColumns(0);
+                currentParent.incrementColumns(childNode.getDeltaColumnsChanged());
             }
         }
 
         if (pointable.getNumberOfChildren() == 0) {
             // Set as empty object
-            objectNode.setEmptyObject(columnMetadata);
+            AbstractSchemaNode missingChild = objectNode.setEmptyObject(columnMetadata);
+            if (!objectNode.isMissingInitiallyInBatch() && objectNode.isEmptyObject()) {
+                objectNode.needAllColumns(true); // to include the missing column, while finalizing the batch.
+                objectNode.setMissingInitiallyInBatch(true);
+                if (missingChild != null) {
+                    currentParent.incrementColumns(missingChild.getDeltaColumnsChanged());
+                }
+                newDiscoveredColumns += 1;
+            }
+        } else if (objectNode.isMissingInitiallyInBatch()) {
+            objectNode.setMissingInitiallyInBatch(false);
+            objectNode.needAllColumns(false);
+        }
+
+        if (objectNode.needAllColumns()) {
+            // parent is not array, but objectNode need all columns, because this node used to be null
+            int previousContribution = currentParent.getNumberOfVisitedColumnsInBatch();
+            newDiscoveredColumns = 0; // reset the new discovered columns
+            newDiscoveredColumns -= previousContribution;
+            newDiscoveredColumns += currentParent.getNumberOfColumns();
+            currentParent.setNewDiscoveredColumns(newDiscoveredColumns);
+            currentParent.setNumberOfVisitedColumnsInBatch(currentParent.getNumberOfColumns());
+        } else {
+            currentParent.setNewDiscoveredColumns(newDiscoveredColumns);
+            currentParent.setNumberOfVisitedColumnsInBatch(
+                    currentParent.getNumberOfVisitedColumnsInBatch() + newDiscoveredColumns);
         }
 
         currentParent = previousParent;
@@ -89,16 +149,44 @@
         AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg;
         currentParent = collectionNode;
 
+        if (currentParent.getVisitedBatchVersion() != currentBatchVersion) {
+            currentParent.setVisitedBatchVersion(currentBatchVersion);
+            currentParent.setNumberOfVisitedColumnsInBatch(0);
+            currentParent.setMissingInitiallyInBatch(false);
+            currentParent.needAllColumns(false);
+        }
+
+        currentParent.needAllColumns(true);
+
+        int newDiscoveredColumns = 0;
         int numberOfChildren = pointable.getNumberOfChildren();
         for (int i = 0; i < numberOfChildren; i++) {
             pointable.nextChild();
             ATypeTag childTypeTag = pointable.getChildTypeTag();
             AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
             acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+            currentParent.incrementColumns(childNode.getDeltaColumnsChanged());
+            int childDiscoveredColumns = childNode.getNewDiscoveredColumns();
+            if (childNode.formerChildNullVersion() == currentBatchVersion) {
+                //Missing or NULL contributed to one of the columns
+                childNode.setFormerChildNull(-1);
+                childDiscoveredColumns -= 1;
+            } else {
+                childNode.setFormerChildNull(-1);
+            }
+            newDiscoveredColumns += childDiscoveredColumns;
+            childNode.setNewDiscoveredColumns(0);
         }
 
         // Add missing as a last element of the array to help indicate empty arrays
         collectionNode.getOrCreateItem(ATypeTag.MISSING, columnMetadata);
+
+        // need all the columns
+        newDiscoveredColumns = 0;
+        newDiscoveredColumns -= currentParent.getNumberOfVisitedColumnsInBatch();
+        newDiscoveredColumns += currentParent.getNumberOfColumns();
+        currentParent.setNewDiscoveredColumns(newDiscoveredColumns);
+        currentParent.setNumberOfVisitedColumnsInBatch(currentParent.getNumberOfColumns());
         currentParent = previousParent;
         return null;
     }
@@ -106,6 +194,15 @@
     @Override
     public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, AbstractSchemaNode arg)
             throws HyracksDataException {
+        PrimitiveSchemaNode node = (PrimitiveSchemaNode) arg;
+
+        if (node.getVisitedBatchVersion() != currentBatchVersion) {
+            //First time in this batch
+            node.setNumberOfVisitedColumnsInBatch(1);
+            node.setNewDiscoveredColumns(1);
+            node.setVisitedBatchVersion(currentBatchVersion);
+        }
+
         return null;
     }
 
@@ -119,23 +216,41 @@
 
             ATypeTag childTypeTag = pointable.getTypeTag();
 
+            AbstractSchemaNode actualNode;
             if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) {
                 /*
                  * NULL and MISSING are tracked since the start to be written in the originalType (i.e., the type
                  * before injecting a union between the parent and the original node).
                  */
-                AbstractSchemaNode actualNode = unionNode.getOriginalType();
+                actualNode = unionNode.getOriginalType();
                 acceptActualNode(pointable, actualNode);
             } else {
-                AbstractSchemaNode actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+                actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
                 pointable.accept(this, actualNode);
             }
-
+            unionNode.setNewDiscoveredColumns(actualNode.getNewDiscoveredColumns());
+            unionNode.setNumberOfVisitedColumnsInBatch(
+                    unionNode.getNumberOfVisitedColumnsInBatch() + actualNode.getNewDiscoveredColumns());
+            actualNode.setNewDiscoveredColumns(0);
+            currentParent.incrementColumns(actualNode.getDeltaColumnsChanged());
             currentParent = previousParent;
         } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) {
-            columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node);
+            node.needAllColumns(true);
+            int previousContribution = node.getNumberOfVisitedColumnsInBatch();
+            int netContribution = node.getNumberOfColumns() - previousContribution;
+            node.setNewDiscoveredColumns(netContribution);
+            node.setNumberOfVisitedColumnsInBatch(node.getNumberOfColumns());
+            columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node, false);
         } else {
             pointable.accept(this, node);
         }
     }
-}
\ No newline at end of file
+
+    public int getNumberOfVisitedColumnsInBatch() {
+        //In case of batch of anti-matters, the current batch version is not equal to the root's visited batch version.
+        if (currentBatchVersion != root.getVisitedBatchVersion()) {
+            return columnMetadata.getNumberOfPrimaryKeys();
+        }
+        return root.getNumberOfVisitedColumnsInBatch() + metaColumnCount;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
index 88e6cc2..7d77a97 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
@@ -111,7 +111,7 @@
 
     @Override
     protected void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels,
-            AbstractSchemaNode node) throws HyracksDataException {
+            AbstractSchemaNode node, boolean includeChildColumns) throws HyracksDataException {
         //NoOp
     }
 
@@ -136,8 +136,8 @@
     }
 
     @Override
-    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
-            throws HyracksDataException {
+    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node,
+            boolean includeChildColumns) throws HyracksDataException {
         //NoOp
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
index 4114f10..261221b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
 import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
@@ -29,6 +30,7 @@
     private final MergeColumnReadMetadata columnMetadata;
 
     public MergeColumnTupleReader(AbstractColumnImmutableReadMetadata columnMetadata) {
+        super(new PageZeroWriterFlavorSelector());
         this.columnMetadata = (MergeColumnReadMetadata) columnMetadata;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index fc7b859..466a0cf 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.column.operation.lsm.merge;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -28,7 +29,9 @@
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.writer.ColumnBatchWriter;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
@@ -36,9 +39,13 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
 public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
     private final MergeColumnWriteMetadata columnMetadata;
     private final int maxLeafNodeSize;
@@ -48,14 +55,19 @@
     private final IColumnValuesWriter[] primaryKeyWriters;
     private final PriorityQueue<IColumnValuesWriter> orderedColumns;
     private final ColumnBatchWriter writer;
+    private final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
+    protected final IntOpenHashSet presentColumnsIndexes;
     private final int maxNumberOfTuples;
     private int primaryKeysEstimatedSize;
     private int numberOfAntiMatter;
+    private int numberOfTuples;
 
     public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
             double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
         this.columnMetadata = columnMetadata;
+        this.pageZeroWriterFlavorSelector = new PageZeroWriterFlavorSelector();
         this.maxLeafNodeSize = maxLeafNodeSize;
+        this.presentColumnsIndexes = new IntOpenHashSet();
         List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
         this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
         int totalLength = 0;
@@ -64,6 +76,7 @@
             MergeColumnTupleReference mergeTuple = (MergeColumnTupleReference) componentsTuplesList.get(i);
             this.componentsTuples[i] = mergeTuple;
             mergeTuple.registerEndOfPageCallBack(this::writeAllColumns);
+            mergeTuple.setColumnIndexes(presentColumnsIndexes);
             totalNumberOfTuples += mergeTuple.getTupleCount();
             totalLength += mergeTuple.getMergingLength();
         }
@@ -95,7 +108,7 @@
     }
 
     @Override
-    public int getNumberOfColumns(boolean includeCurrentTupleColumns) {
+    public int getAbsoluteNumberOfColumns(boolean includeCurrentTupleColumns) {
         return columnMetadata.getNumberOfColumns();
     }
 
@@ -105,15 +118,20 @@
     }
 
     @Override
-    public int getOccupiedSpace() {
-        int numberOfColumns = getNumberOfColumns(true);
-        int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
-        return primaryKeysEstimatedSize + filterSize;
+    public int getPrimaryKeysEstimatedSize() {
+        return primaryKeysEstimatedSize;
     }
 
     @Override
     public void writeTuple(ITupleReference tuple) throws HyracksDataException {
         MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) tuple;
+        if (numberOfTuples == 0) {
+            // fill with the columnIndexes
+            for (MergeColumnTupleReference componentsTuple : componentsTuples) {
+                componentsTuple.fillColumnIndexes();
+            }
+        }
+        numberOfTuples++;
         int componentIndex = columnTuple.getComponentIndex();
         int skipCount = columnTuple.getAndResetSkipCount();
         if (skipCount > 0) {
@@ -132,17 +150,46 @@
     }
 
     @Override
-    public int flush(ByteBuffer pageZero) throws HyracksDataException {
+    public IColumnPageZeroWriterFlavorSelector getColumnPageZeroWriterFlavorSelector() {
+        return pageZeroWriterFlavorSelector;
+    }
+
+    @Override
+    public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
+        int presentColumns = presentColumnsIndexes.size();
+        int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+
+        // space occupied by the sparse writer
+        int spaceOccupiedBySparseWriter = presentColumns
+                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        int spaceOccupiedByDefaultWriter = totalNumberOfColumns
+                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
+                spaceOccupiedBySparseWriter);
+        return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
+    }
+
+    @Override
+    public int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+        // here the numberOfColumns is the total number of columns present in the LSM Index (across all disk components)
+        // Hence, a merge will fail if union(NumberOfColumns(D1) + NumberOfColumns(D2) + ... + NumberOfColumns(DN)) >
+        // pageZero space, and since the merged page contains this many number of columns, the first flush will fail.
         int numberOfColumns = columnMetadata.getNumberOfColumns();
         int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
         if (writtenComponents.getSize() > 0) {
             writeNonKeyColumns();
             writtenComponents.reset();
         }
-        for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
-            orderedColumns.add(columnMetadata.getWriter(i));
+
+        for (int columnIndex : presentColumnsIndexes) {
+            if (columnIndex < numberOfPrimaryKeys) {
+                continue;
+            }
+            orderedColumns.add(columnMetadata.getWriter(columnIndex));
         }
-        writer.setPageZeroBuffer(pageZero, numberOfColumns, numberOfPrimaryKeys);
+
+        // reset pageZeroWriter based on the writer
+        writer.setPageZeroWriter(pageZero, pageZeroWriter, getPresentColumnsIndexesArray(), numberOfColumns);
         writer.writePrimaryKeyColumns(primaryKeyWriters);
         int totalLength = writer.writeColumns(orderedColumns);
 
@@ -150,12 +197,24 @@
         return totalLength;
     }
 
+    public int[] getPresentColumnsIndexesArray() {
+        int[] sortedIndexes = presentColumnsIndexes.toIntArray();
+        Arrays.sort(sortedIndexes);
+        return sortedIndexes;
+    }
+
     @Override
     public void close() {
         columnMetadata.close();
         writer.close();
     }
 
+    @Override
+    public void reset() {
+        presentColumnsIndexes.clear();
+        numberOfTuples = 0;
+    }
+
     private void writePrimaryKeys(MergeColumnTupleReference columnTuple) throws HyracksDataException {
         int primaryKeySize = 0;
         for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
@@ -178,9 +237,12 @@
             }
             MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
             int count = writtenComponents.getBlockSize(i);
-            for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
-                IColumnValuesReader columnReader = componentTuple.getReader(j);
-                IColumnValuesWriter columnWriter = columnMetadata.getWriter(j);
+            for (int columnIndex : presentColumnsIndexes) {
+                if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
+                    continue;
+                }
+                IColumnValuesReader columnReader = componentTuple.getReader(columnIndex);
+                IColumnValuesWriter columnWriter = columnMetadata.getWriter(columnIndex);
                 writeColumn(i, componentIndex, columnReader, columnWriter, count);
             }
         }
@@ -201,8 +263,11 @@
     private void skipReaders(int componentIndex, int count) throws HyracksDataException {
         MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
         try {
-            for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
-                IColumnValuesReader columnReader = componentTuple.getReader(j);
+            for (int columnIndex : presentColumnsIndexes) {
+                if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
+                    continue;
+                }
+                IColumnValuesReader columnReader = componentTuple.getReader(columnIndex);
                 columnReader.skip(count);
             }
         } catch (ColumnarValueException e) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleReader.java
index 36e47ec..6afa274 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleReader.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
 import org.apache.asterix.column.tuple.QueryColumnTupleReference;
 import org.apache.asterix.column.tuple.QueryColumnWithMetaTupleReference;
+import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
@@ -30,6 +31,7 @@
     private final QueryColumnMetadata columnMetadata;
 
     public QueryColumnTupleReader(AbstractColumnImmutableReadMetadata columnMetadata) {
+        super(new PageZeroWriterFlavorSelector());
         this.columnMetadata = (QueryColumnMetadata) columnMetadata;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index 6e13113..0c9c816 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -18,26 +18,26 @@
  */
 package org.apache.asterix.column.tuple;
 
-import java.nio.ByteBuffer;
-
 import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
 import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
 public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference {
     // NoOP callback is for empty pages only
     private static final IEndOfPageCallBack EMPTY_PAGE_CALLBACK = createNoOpCallBack();
     private final IColumnValuesReader[] columnReaders;
     private int skipCount;
     private IEndOfPageCallBack endOfPageCallBack;
+    private IntOpenHashSet presentColumnIndexes;
     private int mergingLength;
 
     public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
@@ -65,9 +65,9 @@
     }
 
     @Override
-    protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
+    protected boolean startNewPage(int numberOfTuples) {
         //Skip filters
-        pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
+        frame.skipFilters();
         // skip count is always start from zero as no "search" is conducted during a merge
         this.skipCount = 0;
         mergingLength = 0;
@@ -90,6 +90,15 @@
     }
 
     @Override
+    public void newPage() throws HyracksDataException {
+        super.newPage();
+        // the tuples are being read, meanwhile MegaLeaf changed
+        if (presentColumnIndexes != null) {
+            frame.getAllColumns(presentColumnIndexes);
+        }
+    }
+
+    @Override
     protected void startColumnFilter(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
         // NoOp
@@ -145,4 +154,12 @@
             }
         };
     }
+
+    public void setColumnIndexes(IntOpenHashSet presentColumnsIndexes) {
+        this.presentColumnIndexes = presentColumnsIndexes;
+    }
+
+    public void fillColumnIndexes() {
+        frame.getAllColumns(presentColumnIndexes);
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
index 8113288..09539b9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.tuple;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.asterix.column.assembler.value.MissingValueGetter;
@@ -36,7 +35,6 @@
 import org.apache.asterix.column.operation.query.QueryColumnMetadata;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
@@ -93,14 +91,13 @@
     }
 
     @Override
-    protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples)
-            throws HyracksDataException {
+    protected boolean startNewPage(int numberOfTuples) throws HyracksDataException {
         //Skip to filters
-        pageZero.position(pageZero.position() + numberOfColumns * Integer.BYTES);
+        frame.skipColumnOffsets();
         //Set filters' values
-        FilterAccessorProvider.setFilterValues(filterValueAccessors, pageZero, numberOfColumns);
+        FilterAccessorProvider.setFilterValues(filterValueAccessors, frame);
         //Skip filters
-        pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
+        frame.skipFilters();
         //Check if we should read all column pages
         boolean readColumns = rangeFilterEvaluator.evaluate();
         assembler.reset(readColumns ? numberOfTuples : 0);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
index 2b8da6d..f3a4fcf 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.tuple;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.asterix.column.assembler.value.MissingValueGetter;
@@ -37,7 +36,6 @@
 import org.apache.asterix.column.operation.query.QueryColumnWithMetaMetadata;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
-import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
@@ -97,14 +95,13 @@
     }
 
     @Override
-    protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples)
-            throws HyracksDataException {
+    protected boolean startNewPage(int numberOfTuples) throws HyracksDataException {
         //Skip to filters
-        pageZero.position(pageZero.position() + numberOfColumns * Integer.BYTES);
+        frame.skipColumnOffsets();
         //Set filters' values
-        FilterAccessorProvider.setFilterValues(filterValueAccessors, pageZero, numberOfColumns);
+        FilterAccessorProvider.setFilterValues(filterValueAccessors, frame);
         //Skip filters
-        pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
+        frame.skipFilters();
         //Check if we should read all column pages
         boolean readColumns = rangeFilterEvaluator.evaluate();
         assembler.reset(readColumns ? numberOfTuples : 0);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index 063743d..45d0008 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -22,9 +22,32 @@
 import java.util.PriorityQueue;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 
+/**
+ * Interface for writing column batch data to storage pages.
+ * 
+ * This interface abstracts the process of writing columnar data, supporting both
+ * dense and sparse column layouts through the use of pluggable page zero writers.
+ * The writer handles page zero metadata, primary key storage, and column data placement
+ * across multiple pages with optimal space utilization.
+ */
 public interface IColumnBatchWriter {
-    void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int numberOfColumns, int numberOfPrimaryKeys);
+
+    /**
+     * Configures the page zero writer for this batch.
+     * 
+     * This method replaces the previous direct buffer approach with a more flexible
+     * abstraction that supports different page zero layouts (default vs sparse).
+     * The writer will be used to manage column offsets, filters, and primary key storage.
+     * 
+     * @param pageZero The page zero buffer where metadata will be written
+     * @param pageZeroWriter The writer implementation for page zero operations
+     * @param presentColumnsIndexes Array of column indexes that contain data in this batch
+     * @param numberOfColumns Total number of columns in the schema
+     */
+    void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
+            int numberOfColumns);
 
     /**
      * Writes the primary keys' values to Page0
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
index 2e2aa9e..cb14d51 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -20,17 +20,17 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.asterix.column.util.RunLengthIntArray;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
 
 /**
  * Column writer for values
  */
-public interface IColumnValuesWriter {
+public interface IColumnValuesWriter extends IValuesWriter {
 
     /**
      * Reset the writer
@@ -124,13 +124,6 @@
     long getNormalizedMaxValue();
 
     /**
-     * Flush the columns value to output stream
-     *
-     * @param out output stream
-     */
-    void flush(OutputStream out) throws HyracksDataException;
-
-    /**
      * Close the writer and release all allocated buffers
      */
     void close();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index ee12140..1a767a9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -18,12 +18,9 @@
  */
 package org.apache.asterix.column.values.writer;
 
-import static org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter.FILTER_SIZE;
-
 import java.nio.ByteBuffer;
 import java.util.PriorityQueue;
 
-import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
 import org.apache.asterix.column.bytes.stream.out.MultiPersistentBufferBytesOutputStream;
 import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
 import org.apache.asterix.column.values.IColumnBatchWriter;
@@ -32,21 +29,22 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 
 /**
- * A writer for a batch columns' values
+ * A writer for a batch columns' values.
+ * This implementation abstracts the page zero operations using IColumnPageZeroWriter,
+ * which allows for supporting different column formats including sparse columns.
  */
 public final class ColumnBatchWriter implements IColumnBatchWriter {
-    private final ByteBufferOutputStream primaryKeys;
     private final MultiPersistentBufferBytesOutputStream columns;
     private final int pageSize;
     private final double tolerance;
     private final IColumnWriteContext writeContext;
     private final IReservedPointer columnLengthPointer;
-    private ByteBuffer pageZero;
-    private int columnsOffset;
-    private int filtersOffset;
-    private int primaryKeysOffset;
+    // The writer for page zero, which handles all page zero operations including 
+    // column offsets and filters. This abstraction supports both default and sparse column formats.
+    private IColumnPageZeroWriter pageZeroWriter;
     private int nonKeyColumnStartOffset;
 
     public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int pageSize, double tolerance,
@@ -54,35 +52,32 @@
         this.pageSize = pageSize;
         this.tolerance = tolerance;
         this.writeContext = writeContext;
-        primaryKeys = new ByteBufferOutputStream();
         columns = new MultiPersistentBufferBytesOutputStream(multiPageOpRef);
         columnLengthPointer = columns.createPointer();
     }
 
-    @Override
-    public void setPageZeroBuffer(ByteBuffer pageZero, int numberOfColumns, int numberOfPrimaryKeys) {
-        this.pageZero = pageZero;
-        int offset = pageZero.position();
-
-        columnsOffset = offset;
-        offset += numberOfColumns * Integer.BYTES;
-
-        filtersOffset = offset;
-        offset += numberOfColumns * FILTER_SIZE;
-
-        pageZero.position(offset);
-        primaryKeysOffset = offset;
-        primaryKeys.reset(pageZero);
-        nonKeyColumnStartOffset = pageZero.capacity();
+    /**
+     * Sets the page zero writer implementation and initializes it.
+     * This method replaces the direct page zero buffer manipulation with a more abstracted approach,
+     * which allows for different page zero layouts (default or sparse).
+     *
+     * @param pageZero The page zero buffer to be used
+     * @param pageZeroWriter The writer implementation for page zero operations
+     * @param presentColumnsIndexes Array containing the indexes of columns present in this batch
+     * @param numberOfColumns Total number of columns in the schema
+     */
+    public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
+            int[] presentColumnsIndexes, int numberOfColumns) {
+        this.pageZeroWriter = pageZeroWriter;
+        pageZeroWriter.reset(pageZero, presentColumnsIndexes, numberOfColumns);
+        pageZeroWriter.allocateColumns();
+        nonKeyColumnStartOffset = pageZeroWriter.getPageZeroBuffer().capacity();
     }
 
     @Override
     public void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
-        for (int i = 0; i < primaryKeyWriters.length; i++) {
-            IColumnValuesWriter writer = primaryKeyWriters[i];
-            setColumnOffset(i, primaryKeysOffset + primaryKeys.size());
-            writer.flush(primaryKeys);
-        }
+        // Delegate primary key column writing to the page zero writer
+        pageZeroWriter.writePrimaryKeyColumns(primaryKeyWriters);
     }
 
     @Override
@@ -102,6 +97,14 @@
         writeContext.close();
     }
 
+    /**
+     * Writes a column's data to the batch.
+     * This method handles column data placement, ensuring optimal space usage and minimizing page splits.
+     * It also records column offsets and filter values in page zero through the pageZeroWriter.
+     * 
+     * @param writer The column values writer containing data to be written
+     * @throws HyracksDataException If an error occurs during writing
+     */
     private void writeColumn(IColumnValuesWriter writer) throws HyracksDataException {
         boolean overlapping = true;
         if (!hasEnoughSpace(columns.getCurrentBufferPosition(), writer)) {
@@ -118,9 +121,21 @@
         writeContext.startWritingColumn(columnIndex, overlapping);
         int columnRelativeOffset = columns.size();
         columns.reserveInteger(columnLengthPointer);
-        setColumnOffset(writer.getColumnIndex(), nonKeyColumnStartOffset + columnRelativeOffset);
 
-        writeFilter(writer);
+        // Get the relative column index within the current page zero layout
+        // This mapping is particularly important for sparse columns where not all columns may be present
+        int relativeColumnIndex = pageZeroWriter.getRelativeColumnIndex(columnIndex);
+
+        // Record the column's absolute offset in page zero using the writer abstraction
+        pageZeroWriter.putColumnOffset(columnIndex, relativeColumnIndex,
+                nonKeyColumnStartOffset + columnRelativeOffset);
+
+        // Store column filter information (min/max values) in page zero
+        // This allows for faster filtering during query execution
+        pageZeroWriter.putColumnFilter(relativeColumnIndex, writer.getNormalizedMinValue(),
+                writer.getNormalizedMaxValue());
+
+        // Write the actual column data
         writer.flush(columns);
 
         int length = columns.size() - columnRelativeOffset;
@@ -128,6 +143,15 @@
         writeContext.endWritingColumn(columnIndex, length);
     }
 
+    /**
+     * Determines if there is enough space in the current buffer for a column.
+     * This method implements a space management strategy that balances between
+     * optimal buffer utilization and minimizing column splits across pages.
+     *
+     * @param bufferPosition Current position in the buffer
+     * @param columnWriter The column writer with data to be written
+     * @return true if there is enough space, false otherwise
+     */
     private boolean hasEnoughSpace(int bufferPosition, IColumnValuesWriter columnWriter) {
         if (bufferPosition == 0) {
             // if the current buffer is empty, then use it
@@ -159,14 +183,4 @@
          */
         return freeSpace > columnSize || remainingPercentage >= tolerance;
     }
-
-    private void setColumnOffset(int columnIndex, int offset) {
-        pageZero.putInt(columnsOffset + Integer.BYTES * columnIndex, offset);
-    }
-
-    private void writeFilter(IColumnValuesWriter writer) {
-        int offset = filtersOffset + writer.getColumnIndex() * FILTER_SIZE;
-        pageZero.putLong(offset, writer.getNormalizedMinValue());
-        pageZero.putLong(offset + Long.BYTES, writer.getNormalizedMaxValue());
-    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java
index abbe314..458e24c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java
@@ -22,7 +22,6 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 
 public abstract class AbstractColumnFilterWriter {
-    public static final int FILTER_SIZE = Long.BYTES * 2;
 
     public void addLong(long value) {
         throw new UnsupportedOperationException(getClass().getName());
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
new file mode 100644
index 0000000..da7c9f2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.DEFAULT_WRITER_FLAG;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.SPARSE_WRITER_FLAG;
+
+import org.apache.asterix.column.zero.readers.DefaultColumnPageZeroReader;
+import org.apache.asterix.column.zero.readers.SparseColumnPageZeroReader;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
+
+import it.unimi.dsi.fastutil.bytes.Byte2ObjectArrayMap;
+
+/**
+ * Selector that chooses between different page zero writer implementations based on space efficiency.
+ *<p>
+ * This class implements an optimization strategy for sparse columns:
+ * - Default writer: Allocates space for all columns in the schema (suitable for dense data)
+ * - Sparse writer: Only allocates space for present columns (suitable for sparse data)
+ *</p>
+ * The selector automatically chooses the most space-efficient option based on the actual
+ * space requirements of each approach.
+ */
+public class PageZeroWriterFlavorSelector implements IColumnPageZeroWriterFlavorSelector {
+    // Flag indicating which writer type is currently selected (DEFAULT_WRITER_FLAG=default, SPARSE_WRITER_FLAG=sparse)
+    protected byte writerFlag = DEFAULT_WRITER_FLAG;
+
+    // Cache of writer instances to avoid repeated object creation
+    private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
+    private final Byte2ObjectArrayMap<IColumnPageZeroReader> readers;
+
+    public PageZeroWriterFlavorSelector() {
+        writers = new Byte2ObjectArrayMap<>();
+        readers = new Byte2ObjectArrayMap<>();
+    }
+
+    /**
+     * Selects the optimal page zero writer based on space efficiency.
+     * <p>
+     * This method compares the space requirements of both writer types and selects
+     * the one that uses less space. This optimization is particularly beneficial
+     * for sparse datasets where many columns may be null or missing.
+     * </p>
+     * @param spaceOccupiedByDefaultWriter Space in bytes required by the default writer
+     * @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
+     */
+    @Override
+    public void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter) {
+        if (spaceOccupiedByDefaultWriter <= spaceOccupiedBySparseWriter) {
+            // Default writer is more space-efficient (or equal), use it
+            writerFlag = DEFAULT_WRITER_FLAG;
+        } else {
+            // Sparse writer is more space-efficient, use it
+            writerFlag = SPARSE_WRITER_FLAG;
+        }
+    }
+
+    /**
+     * Returns the currently selected page zero writer instance.
+     * Writers are cached to avoid repeated object creation.
+     * 
+     * @return the selected writer instance
+     * @throws IllegalStateException if an unsupported writer flag is encountered
+     */
+    @Override
+    public IColumnPageZeroWriter getPageZeroWriter() {
+       return switch (writerFlag) {
+           case DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroWriter());
+           case SPARSE_WRITER_FLAG -> writers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroWriter());
+           default -> throw new IllegalStateException("Unsupported writer flag: " + writerFlag);
+       };
+    }
+
+    /**
+     * Creates a page zero reader instance based on the provided flag.
+     * This method is used during deserialization to create the appropriate reader
+     * for the writer type that was used during serialization.
+     * 
+     * @param flag The flag code identifying the writer type (DEFAULT_WRITER_FLAG=default, SPARSE_WRITER_FLAG=sparse)
+     * @return the appropriate reader instance
+     * @throws IllegalStateException if an unsupported reader flag is encountered
+     */
+    @Override
+    public IColumnPageZeroReader createPageZeroReader(byte flag) {
+        return switch (flag) {
+            case DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroReader());
+            case SPARSE_WRITER_FLAG -> readers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroReader());
+            default -> throw new IllegalStateException("Unsupported reader flag: " + flag);
+        };
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
new file mode 100644
index 0000000..b643fd6
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.readers;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
+public class DefaultColumnPageZeroReader implements IColumnPageZeroReader {
+    protected ByteBuffer pageZeroBuf;
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf) {
+        this.pageZeroBuf = pageZeroBuf;
+    }
+
+    @Override
+    public int getColumnOffset(int columnIndex) {
+        return pageZeroBuf.getInt(HEADER_SIZE + columnIndex * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+    }
+
+    @Override
+    public int getColumnFilterOffset(int columnIndex) {
+        int columnsOffsetEnd =
+                HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        return columnsOffsetEnd + columnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
+    }
+
+    @Override
+    public long getLong(int offset) {
+        return pageZeroBuf.getLong(offset);
+    }
+
+    @Override
+    public void skipFilters() {
+        int filterEndOffset = getColumnFilterOffset(getNumberOfPresentColumns());
+        pageZeroBuf.position(filterEndOffset);
+    }
+
+    @Override
+    public void skipColumnOffsets() {
+        int columnEndOffset =
+                HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        pageZeroBuf.position(columnEndOffset);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return pageZeroBuf.getInt(TUPLE_COUNT_OFFSET);
+    }
+
+    @Override
+    public int getLeftMostKeyOffset() {
+        return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getRightMostKeyOffset() {
+        return pageZeroBuf.getInt(RIGHT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getNumberOfPresentColumns() {
+        return pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+    }
+
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        return columnIndex;
+    }
+
+    @Override
+    public int getNextLeaf() {
+        return pageZeroBuf.getInt(NEXT_LEAF_OFFSET);
+    }
+
+    @Override
+    public int getMegaLeafNodeLengthInBytes() {
+        return pageZeroBuf.getInt(MEGA_LEAF_NODE_LENGTH);
+    }
+
+    @Override
+    public int getPageZeroCapacity() {
+        return pageZeroBuf.capacity();
+    }
+
+    @Override
+    public boolean isValidColumn(int columnIndex) {
+        int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
+        return relativeColumnIndex < getNumberOfPresentColumns();
+    }
+
+    @Override
+    public void getAllColumns(IntOpenHashSet presentColumns) {
+        int numberOfColumns = getNumberOfPresentColumns();
+        for (int i = 0; i < numberOfColumns; i++) {
+            presentColumns.add(i);
+        }
+    }
+
+    @Override
+    public ByteBuffer getPageZeroBuf() {
+        return pageZeroBuf;
+    }
+
+    @Override
+    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        int columnOffsetStart = HEADER_SIZE;
+        for (int i = 0; i < offsetColumnIndexPairs.length; i++) {
+            int offset = pageZeroBuf.getInt(columnOffsetStart);
+            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
+            columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
new file mode 100644
index 0000000..44d2369
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.readers;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
+public class SparseColumnPageZeroReader extends DefaultColumnPageZeroReader {
+    private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
+
+    public SparseColumnPageZeroReader() {
+        columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
+        columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf) {
+        super.reset(pageZeroBuf);
+        columnIndexToRelativeColumnIndex.clear();
+    }
+
+    @Override
+    public int getColumnOffset(int columnIndex) {
+        int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
+        return pageZeroBuf.getInt(
+                HEADER_SIZE + relativeColumnIndex * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES);
+    }
+
+    @Override
+    public int getColumnFilterOffset(int columnIndex) {
+        int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
+        int columnsOffsetEnd =
+                HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        return columnsOffsetEnd + relativeColumnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
+    }
+
+    @Override
+    public void skipFilters() {
+        int filterEndOffset = HEADER_SIZE + getNumberOfPresentColumns()
+                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        pageZeroBuf.position(filterEndOffset);
+    }
+
+    @Override
+    public void skipColumnOffsets() {
+        int columnsOffsetEnd =
+                HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        pageZeroBuf.position(columnsOffsetEnd);
+    }
+
+    // x + 0, 8, 16, .... , 8*(n-1)
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        // if the entry is in cache, return it
+        int cachedIndex = columnIndexToRelativeColumnIndex.get(columnIndex);
+        if (cachedIndex != -1) {
+            return cachedIndex;
+        }
+        int startColumnIndex = getColumnIndex(0);
+        int startColumn = pageZeroBuf.getInt(startColumnIndex);
+        if (startColumn == columnIndex) {
+            columnIndexToRelativeColumnIndex.put(columnIndex, 0);
+            return 0;
+        }
+
+        int totalColumns = getNumberOfPresentColumns();
+        int lastColumnIndex = getColumnIndex(totalColumns - 1);
+        int lastColumn = pageZeroBuf.getInt(lastColumnIndex);
+        if (lastColumn == columnIndex) {
+            columnIndexToRelativeColumnIndex.put(columnIndex, totalColumns - 1);
+            return totalColumns - 1;
+        }
+
+        int start = 0;
+        int end = totalColumns - 1;
+        while (start <= end) {
+            int mid = start + (end - start) / 2;
+            int midIndex = getColumnIndex(mid);
+            int midColumnIndex = pageZeroBuf.getInt(midIndex);
+            if (midColumnIndex == columnIndex) {
+                columnIndexToRelativeColumnIndex.put(columnIndex, mid);
+                return mid; // this is the relative index
+            } else if (midColumnIndex < columnIndex) {
+                start = mid + 1;
+            } else {
+                end = mid - 1;
+            }
+        }
+
+        return -1;
+    }
+
+    private int getColumnIndex(int index) {
+        return HEADER_SIZE + index * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+    }
+
+    @Override
+    public boolean isValidColumn(int columnIndex) {
+        int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
+        return relativeColumnIndex != -1;
+    }
+
+    @Override
+    public void getAllColumns(IntOpenHashSet presentColumns) {
+        int columnIndex = getColumnIndex(0);
+        for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+            int column = pageZeroBuf.getInt(columnIndex);
+            presentColumns.add(column);
+            columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+    }
+
+    @Override
+    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        int columnIndex = getColumnIndex(0);
+        for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+            int column = pageZeroBuf.getInt(columnIndex);
+            int offset = pageZeroBuf.getInt(columnIndex + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
+            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, column);
+            columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
new file mode 100644
index 0000000..3d7aa93
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/**
+ * Default implementation of page zero writer that allocates space for all columns in the schema.
+ * 
+ * This writer uses a fixed layout where every column in the schema has a reserved slot,
+ * regardless of whether data is present for that column. This approach is optimal for
+ * dense datasets where most columns contain data.
+ * 
+ * Memory layout in page zero:
+ * 1. Column offsets: 4 bytes per column (numberOfColumns * 4 bytes)
+ * 2. Column filters: 16 bytes per column (numberOfColumns * 16 bytes) - min/max values
+ * 3. Primary key data: variable size, written sequentially
+ */
+public class DefaultColumnPageZeroWriter implements IColumnPageZeroWriter {
+    /** Size in bytes for storing a column offset */
+    public static final int COLUMN_OFFSET_SIZE = Integer.BYTES;
+    /** Size in bytes for storing column filter (min + max values) */
+    public static final int FILTER_SIZE = Long.BYTES * 2; // min and max
+
+    private final ByteBufferOutputStream primaryKeys;
+    private ByteBuffer pageZero;
+    private int numberOfColumns;
+
+    // Offset positions within page zero buffer
+    private int primaryKeysOffset; // Where primary key data starts
+    private int columnsOffset; // Where column offset array starts  
+    private int filtersOffset; // Where column filter array starts
+
+    public DefaultColumnPageZeroWriter() {
+        primaryKeys = new ByteBufferOutputStream();
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns) {
+        this.pageZero = pageZeroBuf;
+        this.numberOfColumns = numberOfColumns;
+        this.primaryKeysOffset = pageZeroBuf.position();
+    }
+
+    @Override
+    public byte flagCode() {
+        return DEFAULT_WRITER_FLAG;
+    }
+
+    /**
+     * Allocates space in page zero for all column metadata.
+     * 
+     * The allocation strategy reserves space for all columns in the schema:
+     * - Column offsets: Fixed array of 4-byte integers
+     * - Column filters: Fixed array of 16-byte min/max pairs
+     * - Primary keys: Variable-size data written after metadata
+     */
+    @Override
+    public void allocateColumns() {
+        // allocate space for columns' offset (4 * numberOfColumns)
+        columnsOffset = primaryKeysOffset;
+        primaryKeysOffset += COLUMN_OFFSET_SIZE * numberOfColumns;
+
+        // allocate space for columns' filter (8 + 8) * numberOfColumns
+        filtersOffset = primaryKeysOffset;
+        primaryKeysOffset += FILTER_SIZE * numberOfColumns;
+
+        // reset the position for pageZero,
+        // the primary keys will be written from this offset
+        pageZero.position(primaryKeysOffset);
+        primaryKeys.reset(pageZero);
+    }
+
+    /**
+     * Records a column's data offset using direct array indexing.
+     * In the default layout, the column index directly maps to the array position.
+     * 
+     * @param absoluteColumnIndex The absolute column index (unused in default layout)
+     * @param relativeColumnIndex The column index used for array positioning
+     * @param offset The byte offset where the column's data begins
+     */
+    @Override
+    public void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset) {
+        pageZero.putInt(columnsOffset + COLUMN_OFFSET_SIZE * relativeColumnIndex, offset);
+    }
+
+    /**
+     * Stores column filter information using direct array indexing.
+     * Filters enable efficient column pruning during query execution.
+     * 
+     * @param relativeColumnIndex The column index used for array positioning
+     * @param normalizedMinValue The normalized minimum value in the column
+     * @param normalizedMaxValue The normalized maximum value in the column
+     */
+    @Override
+    public void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue) {
+        int offset = filtersOffset + relativeColumnIndex * FILTER_SIZE;
+        pageZero.putLong(offset, normalizedMinValue);
+        pageZero.putLong(offset + Long.BYTES, normalizedMaxValue);
+    }
+
+    /**
+     * Writes primary key columns directly to page zero.
+     * Primary keys are stored in page zero for fast access during operations.
+     * 
+     * @param primaryKeyWriters Array of writers containing primary key data
+     * @throws HyracksDataException If an error occurs during writing
+     */
+    @Override
+    public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        for (int i = 0; i < primaryKeyWriters.length; i++) {
+            IValuesWriter writer = primaryKeyWriters[i];
+            // Record the offset where this primary key column starts
+            putColumnOffset(i, i, primaryKeysOffset + primaryKeys.size());
+            // Write the actual primary key data
+            writer.flush(primaryKeys);
+        }
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return numberOfColumns;
+    }
+
+    /**
+     * In the default layout, all columns are always included since space is pre-allocated.
+     * 
+     * @param presentColumns Set of columns present in this page (unused)
+     * @param columnIndex The column index to check (unused)
+     * @param includeChildrenColumns Whether to include child columns (unused)
+     * @return always true for default layout
+     */
+    @Override
+    public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer getPageZeroBuffer() {
+        return pageZero;
+    }
+
+    /**
+     * In the default layout, the relative index is the same as the absolute index.
+     * 
+     * @param columnIndex The absolute column index
+     * @return the same column index (identity mapping)
+     */
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        return columnIndex;
+    }
+
+    @Override
+    public int getColumnOffsetsSize() {
+        return numberOfColumns * COLUMN_OFFSET_SIZE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
new file mode 100644
index 0000000..f74b575
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers;
+
+import static org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter.FILTER_SIZE;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/**
+ * Sparse implementation of page zero writer that only allocates space for present columns.
+ * 
+ * This writer optimizes space usage for sparse datasets by storing only the columns
+ * that actually contain data. Each column entry includes both the column index and
+ * its data offset, allowing for efficient lookup while minimizing space overhead.
+ *
+ * Memory layout in page zero:
+ * 1. Column entries: 8 bytes per present column (4 bytes index + 4 bytes offset)
+ * 2. Column filters: 16 bytes per present column (min/max values)
+ * 3. Primary key data: variable size, written sequentially
+ *
+ * This layout is particularly beneficial when the number of present columns is
+ * significantly smaller than the total schema size.
+ */
+public class SparseColumnPageZeroWriter implements IColumnPageZeroWriter {
+    /** Size in bytes for storing a column index */
+    public static final int COLUMN_INDEX_SIZE = Integer.BYTES;
+    /** Size in bytes for storing a column entry (index + offset) */
+    public static final int COLUMN_OFFSET_SIZE = Integer.BYTES + COLUMN_INDEX_SIZE;
+
+    private final ByteBufferOutputStream primaryKeys;
+    private int[] presentColumns;
+    private int numberOfPresentColumns;
+    private ByteBuffer pageZero;
+
+    // Offset positions within page zero buffer
+    private int primaryKeysOffset; // Where primary key data starts
+    private int columnsOffset; // Where column entries array starts
+    private int filtersOffset; // Where column filter array starts
+
+    public SparseColumnPageZeroWriter() {
+        primaryKeys = new ByteBufferOutputStream();
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns /* not being used */) {
+        this.pageZero = pageZeroBuf;
+        this.presentColumns = presentColumns;
+        this.numberOfPresentColumns = presentColumns.length;
+        this.primaryKeysOffset = pageZeroBuf.position();
+    }
+
+    @Override
+    public byte flagCode() {
+        return SPARSE_WRITER_FLAG;
+    }
+
+    /**
+     * Allocates space in page zero for present column metadata only.
+     * 
+     * The allocation strategy reserves space only for columns that contain data:
+     * - Column entries: Array of (index, offset) pairs for present columns
+     * - Column filters: Array of min/max pairs for present columns only
+     * - Primary keys: Variable-size data written after metadata
+     */
+    @Override
+    public void allocateColumns() {
+        // allocate space for columns' offset (8 * numberOfPresentColumns)
+        columnsOffset = primaryKeysOffset;
+        primaryKeysOffset += COLUMN_OFFSET_SIZE * numberOfPresentColumns;
+
+        // allocate space for filters'
+        filtersOffset = primaryKeysOffset;
+        primaryKeysOffset += FILTER_SIZE * numberOfPresentColumns;
+
+        // reset the position for pageZero,
+        // the primary keys will be written from this offset
+        pageZero.position(primaryKeysOffset);
+        primaryKeys.reset(pageZero);
+    }
+
+    /**
+     * Records a column's data offset along with its absolute column index.
+     * 
+     * In the sparse layout, each entry stores both the original column index
+     * and the data offset, enabling lookup of sparse columns.
+     * 
+     * @param absoluteColumnIndex The absolute column index in the schema
+     * @param relativeColumnIndex The position within the present columns array
+     * @param offset The byte offset where the column's data begins
+     */
+    @Override
+    public void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset) {
+        int columnOffset = columnsOffset + COLUMN_OFFSET_SIZE * relativeColumnIndex;
+        // Store the absolute column index first
+        pageZero.putInt(columnOffset, absoluteColumnIndex);
+        // Then store the data offset
+        pageZero.putInt(columnOffset + Integer.BYTES, offset);
+    }
+
+    /**
+     * Stores column filter information for present columns only.
+     * Uses the relative column index to position within the sparse filter array.
+     * 
+     * @param relativeColumnIndex The position within the present columns array
+     * @param normalizedMinValue The normalized minimum value in the column
+     * @param normalizedMaxValue The normalized maximum value in the column
+     */
+    @Override
+    public void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue) {
+        int offset = filtersOffset + relativeColumnIndex * FILTER_SIZE;
+        pageZero.putLong(offset, normalizedMinValue);
+        pageZero.putLong(offset + Long.BYTES, normalizedMaxValue);
+    }
+
+    /**
+     * Writes primary key columns directly to page zero.
+     * Primary keys are always present and stored similarly to the default layout.
+     * 
+     * @param primaryKeyWriters Array of writers containing primary key data
+     * @throws HyracksDataException If an error occurs during writing
+     */
+    @Override
+    public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        for (int i = 0; i < primaryKeyWriters.length; i++) {
+            IValuesWriter writer = primaryKeyWriters[i];
+            // Record the offset where this primary key column starts
+            putColumnOffset(i, i, primaryKeysOffset + primaryKeys.size());
+            // Write the actual primary key data
+            writer.flush(primaryKeys);
+        }
+    }
+
+    /**
+     * Performs binary search to find the relative position of a column index
+     * within the sorted present columns array.
+     * 
+     * @param columnIndex The absolute column index to find
+     * @return the relative position within present columns, or -1 if not found
+     */
+    private int findColumnIndex(int columnIndex) {
+        int low = 0;
+        int high = presentColumns.length - 1;
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+            int midVal = presentColumns[mid];
+            if (midVal == columnIndex) {
+                return mid;
+            } else if (midVal < columnIndex) {
+                low = mid + 1;
+            } else {
+                high = mid - 1;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Determines whether a column should be included based on presence in the sparse set.
+     * 
+     * For sparse layouts, only explicitly present columns or child columns
+     * (when includeChildrenColumns is true) are included.
+     * 
+     * @param presentColumns Set of columns present in this page
+     * @param columnIndex The column index to check
+     * @param includeChildrenColumns Whether to include child columns for complex types
+     * @return true if the column should be included
+     */
+    @Override
+    public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+        return includeChildrenColumns || presentColumns.get(columnIndex);
+    }
+
+    @Override
+    public ByteBuffer getPageZeroBuffer() {
+        return pageZero;
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return presentColumns.length;
+    }
+
+    /**
+     * Maps an absolute column index to its relative position within the present columns array.
+     * 
+     * This mapping is essential for sparse layouts where the storage position
+     * differs from the schema position.
+     * 
+     * @param columnIndex The absolute column index in the schema
+     * @return the relative position within the present columns array
+     * @throws IllegalStateException if the column index is not found in present columns
+     */
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        int columnRelativeIndex = findColumnIndex(columnIndex);
+        if (columnRelativeIndex == -1) {
+            throw new IllegalStateException("Column index " + columnIndex + " does not exist in present columns.");
+        }
+        return columnRelativeIndex;
+    }
+
+    @Override
+    public int getColumnOffsetsSize() {
+        return numberOfPresentColumns * COLUMN_OFFSET_SIZE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 9d57dde..fe5ef54 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -48,6 +48,7 @@
 import org.apache.asterix.column.test.bytes.components.TestColumnBufferProvider;
 import org.apache.asterix.column.values.IColumnValuesWriterFactory;
 import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.om.pointables.ARecordVisitablePointable;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.pointables.printer.json.clean.APrintVisitor;
@@ -191,9 +192,9 @@
         pageZero.clear();
         //Reserve the header space
         pageZero.position(HEADER_SIZE);
-        pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero));
+        pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero, new DefaultColumnPageZeroWriter()));
         //Write page header
-        int numberOfColumn = writer.getNumberOfColumns(false);
+        int numberOfColumn = writer.getAbsoluteNumberOfColumns(false);
         pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
         pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);
 
@@ -209,9 +210,9 @@
         //Reserved for the number of pages
         int requiredFreeSpace = HEADER_SIZE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getColumnOffsetsSize(true);
+        requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
         //Occupied space from previous writes
-        requiredFreeSpace += columnWriter.getOccupiedSpace();
+        requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //New tuple required space
         requiredFreeSpace += columnWriter.bytesRequired(tuple);
         return PAGE_SIZE <= requiredFreeSpace;
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AbstractDummyTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AbstractDummyTest.java
index 1fafcce..9d6e619 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AbstractDummyTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AbstractDummyTest.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.column.test.dummy;
 
 import java.io.IOException;
+import java.util.BitSet;
 import java.util.Collections;
 
 import org.apache.asterix.column.common.buffer.NoOpWriteMultiPageOp;
@@ -30,6 +31,7 @@
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
 import org.apache.asterix.column.values.writer.DummyColumnValuesWriterFactory;
 import org.apache.asterix.column.values.writer.NoOpColumnBatchWriter;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -41,6 +43,7 @@
     protected final FlushColumnMetadata columnMetadata;
     protected final ColumnTransformer columnTransformer;
     protected final BatchFinalizerVisitor finalizer;
+    protected final BitSet presentColumnsIndexes;
     //Schema
     protected final ArrayBackedValueStorage storage;
     protected final RecordLazyVisitablePointable pointable;
@@ -48,9 +51,10 @@
 
     protected AbstractDummyTest(TestCase testCase) throws HyracksDataException {
         super(testCase);
+        presentColumnsIndexes = new BitSet();
         columnMetadata = new FlushColumnMetadata(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, null,
                 Collections.emptyList(), null, WRITER_FACTORY, new MutableObject<>(NoOpWriteMultiPageOp.INSTANCE));
-        columnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
+        columnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot(), presentColumnsIndexes);
         finalizer = new BatchFinalizerVisitor(columnMetadata);
         storage = new ArrayBackedValueStorage();
         pointable = new RecordLazyVisitablePointable(true);
@@ -64,7 +68,8 @@
             storage.reset();
             numberOfTuples++;
         }
-        finalizer.finalizeBatch(NoOpColumnBatchWriter.INSTANCE, columnMetadata);
+        finalizer.finalizeBatchColumns(columnMetadata, presentColumnsIndexes, new DefaultColumnPageZeroWriter());
+        finalizer.finalizeBatch(NoOpColumnBatchWriter.INSTANCE);
         return columnMetadata.getRoot();
     }
 }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index 234f804..ea4dbe6a 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.column.values.IColumnBatchWriter;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 
 public class NoOpColumnBatchWriter implements IColumnBatchWriter {
     public static final IColumnBatchWriter INSTANCE = new NoOpColumnBatchWriter();
@@ -32,7 +33,8 @@
     }
 
     @Override
-    public void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int numberOfColumns, int numberOfPrimaryKeys) {
+    public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
+            int[] presentColumnsIndexes, int numberOfColumns) {
         // NoOp
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReader.java
index 7db792b..433d00e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReader.java
@@ -18,33 +18,29 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.api;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
 
 /**
  * Provided for columnar read tuple reference
  */
 public abstract class AbstractColumnTupleReader extends AbstractTupleWriterDisabledMethods {
+    protected final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
+
+    protected AbstractColumnTupleReader(IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector) {
+        this.pageZeroWriterFlavorSelector = pageZeroWriterFlavorSelector;
+    }
+
     public abstract IColumnTupleIterator createTupleIterator(ColumnBTreeReadLeafFrame frame, int componentIndex,
             IColumnReadMultiPageOp multiPageOp);
 
-    /**
-     * Currently fixed to 4-byte per offset
-     *
-     * @param buf         buffer of Page0
-     * @param columnIndex column index
-     * @return column offset
-     * @see AbstractColumnTupleWriter#getColumnOffsetsSize()
-     */
-    public final int getColumnOffset(ByteBuffer buf, int columnIndex) {
-        return buf.getInt(AbstractColumnBTreeLeafFrame.HEADER_SIZE + columnIndex * Integer.BYTES);
-    }
-
     @Override
     public final int bytesRequired(ITupleReference tuple) {
         throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG);
     }
+
+    public IColumnPageZeroWriterFlavorSelector getPageZeroWriterFlavorSelector() {
+        return pageZeroWriterFlavorSelector;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 14f2399..b52f59f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -22,6 +22,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
 
 /**
  * Columnar Tuple Writer:
@@ -35,7 +37,7 @@
  * - Initially, the writer has to set multiPageOp by calling {@link #init(IColumnWriteMultiPageOp)}
  * - For each write, the caller should check if adding a tuple does not exceed the {@link #getMaxNumberOfTuples()} or
  * the on-disk page size (called stopping condition)
- * - If the stopping condition is reached, then {@link #flush(ByteBuffer)} needed to be called
+ * - If the stopping condition is reached, then {@link #flush(ByteBuffer, IColumnPageZeroWriter)} needed to be called
  * <p>
  * Hyracks visibility:
  * - Columns are written as blobs (i.e., not interpretable by Hyracks)
@@ -54,7 +56,7 @@
     /**
      * @return The current number of columns including the current tuple
      */
-    public abstract int getNumberOfColumns(boolean includeCurrentTupleColumns);
+    public abstract int getAbsoluteNumberOfColumns(boolean includeCurrentTupleColumns);
 
     /**
      * Currently, a column offset takes 4-byte (fixed). But in the future, we can reformat the offsets. For example,
@@ -62,9 +64,7 @@
      *
      * @return the size needed to store columns' offsets
      */
-    public final int getColumnOffsetsSize(boolean includeCurrentTupleColumns) {
-        return Integer.BYTES * getNumberOfColumns(includeCurrentTupleColumns);
-    }
+    public abstract int getColumnOccupiedSpace(boolean includeCurrentTupleColumns);
 
     /**
      * @return maximum number of tuples to be stored per page (i.e., page0)
@@ -74,7 +74,7 @@
     /**
      * @return page0 occupied space
      */
-    public abstract int getOccupiedSpace();
+    public abstract int getPrimaryKeysEstimatedSize();
 
     /**
      * Writes the tuple into a temporary internal buffers
@@ -88,10 +88,22 @@
      *
      * @return total flushed length (including page zero)
      */
-    public abstract int flush(ByteBuffer pageZero) throws HyracksDataException;
+    public abstract int flush(ByteBuffer pageZero, IColumnPageZeroWriter columnPageZeroWriter)
+            throws HyracksDataException;
 
     /**
      * Close the current writer and release all allocated temporary buffers
      */
     public abstract void close();
+
+    /**
+     * reset the state after flush
+     */
+    public abstract void reset();
+
+    /**
+     * get the pageZero writer selector
+     * @return
+     */
+    public abstract IColumnPageZeroWriterFlavorSelector getColumnPageZeroWriterFlavorSelector();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index eacf4fd..98fa419 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -110,11 +110,8 @@
 
         // Get the number of columns in a page
         int numberOfColumns = leafFrame.getNumberOfColumns();
-        for (int i = 0; i < numberOfColumns; i++) {
-            int offset = leafFrame.getColumnOffset(i);
-            // Set the first 32-bits to the offset and the second 32-bits to columnIndex
-            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
-        }
+        // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+        leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
 
         // Set artificial offset to determine the last column's length
         int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
@@ -131,6 +128,8 @@
 
             // Compute the column's length in bytes (set 0 for PKs)
             int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+            // In case of sparse columns, few columnIndexes can be greater than the total sparse column count.
+            ensureCapacity(columnIndex);
             lengths[columnIndex] = length;
 
             // Get start page ID (given the computed length above)
@@ -278,6 +277,12 @@
         }
     }
 
+    private void ensureCapacity(int columnIndex) {
+        if (columnIndex >= lengths.length) {
+            lengths = IntArrays.grow(lengths, columnIndex + 1);
+        }
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
index a069a1c..f3a7c4f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
@@ -67,13 +67,16 @@
     public static final int HEADER_SIZE = NEXT_LEAF_OFFSET + 4;
 
     protected final ITreeIndexTupleWriter rowTupleWriter;
+    protected final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
 
     protected MultiComparator cmp;
     protected ICachedPage page;
     protected ByteBuffer buf;
 
-    AbstractColumnBTreeLeafFrame(ITreeIndexTupleWriter rowTupleWriter) {
+    AbstractColumnBTreeLeafFrame(ITreeIndexTupleWriter rowTupleWriter,
+            IColumnPageZeroWriterFlavorSelector columnPageZeroWriterFlavorSelector) {
         this.rowTupleWriter = rowTupleWriter;
+        this.pageZeroWriterFlavorSelector = columnPageZeroWriterFlavorSelector;
     }
 
     /* ****************************************************************************
@@ -98,8 +101,13 @@
         this.buf = page.getBuffer().duplicate();
         buf.clear();
         buf.position(HEADER_SIZE);
+        resetPageZeroReader();
     }
 
+    protected void resetPageZeroReader() {
+
+    };
+
     @Override
     public final ICachedPage getPage() {
         return page;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 114ac05..8d1080a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -93,6 +93,10 @@
             writeFullLeafPage();
             confiscateNewLeafPage();
         }
+        if (tupleCount == 0) {
+            //Since we are writing the first tuple, we need to estimate the number of columns.
+            columnWriter.updateColumnMetadataForCurrentTuple(tuple);
+        }
         //Save the key of the last inserted tuple
         setMinMaxKeys(tuple);
         columnWriter.writeTuple(tuple);
@@ -106,17 +110,18 @@
 
     private boolean isFull(ITupleReference tuple) throws HyracksDataException {
         if (tupleCount == 0) {
+            columnWriter.updateColumnMetadataForCurrentTuple(tuple);
             return false;
         } else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) {
             //We reached the maximum number of tuples
             return true;
         }
+        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
         int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
         //Columns' Offsets
-        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        requiredFreeSpace += columnWriter.getColumnOffsetsSize(true);
+        requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
         //Occupied space from previous writes
-        requiredFreeSpace += columnWriter.getOccupiedSpace();
+        requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //min and max tuples' sizes
         requiredFreeSpace += lowKey.getTuple().getTupleSize() + getSplitKeySize(tuple);
         //New tuple required space
@@ -308,9 +313,9 @@
             // number of tuples processed for the current leaf
             state.put("currentLeafTupleCount", tupleCount);
             // number of columns
-            state.put("currentLeafColumnCount", columnWriter.getNumberOfColumns(false));
+            state.put("currentLeafColumnCount", columnWriter.getAbsoluteNumberOfColumns(false));
             // number of columns including current tuple
-            state.put("currentColumnCount", columnWriter.getNumberOfColumns(true));
+            state.put("currentColumnCount", columnWriter.getAbsoluteNumberOfColumns(true));
             state.put("lastRequiredFreeSpace", lastRequiredFreeSpace);
             state.put("splitKeyTupleSize", splitKey.getTuple().getTupleSize());
             state.put("splitKeyTupleSizeByTupleWriter", tupleWriter.bytesRequired(splitKey.getTuple()));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index 1b4d09d..58a62ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -27,27 +27,36 @@
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
 public final class ColumnBTreeReadLeafFrame extends AbstractColumnBTreeLeafFrame {
     private final AbstractColumnTupleReader columnarTupleReader;
     private final ITreeIndexTupleReference leftMostTuple;
     private final ITreeIndexTupleReference rightMostTuple;
+    private IColumnPageZeroReader columnPageZeroReader;
 
     public ColumnBTreeReadLeafFrame(ITreeIndexTupleWriter rowTupleWriter,
             AbstractColumnTupleReader columnarTupleReader) {
-        super(rowTupleWriter);
+        super(rowTupleWriter, columnarTupleReader.getPageZeroWriterFlavorSelector());
         this.columnarTupleReader = columnarTupleReader;
         leftMostTuple = rowTupleWriter.createTupleReference();
         rightMostTuple = rowTupleWriter.createTupleReference();
     }
 
     @Override
+    protected void resetPageZeroReader() {
+        columnPageZeroReader = pageZeroWriterFlavorSelector.createPageZeroReader(getFlagByte());
+        columnPageZeroReader.reset(buf);
+    }
+
+    @Override
     public ITupleReference getLeftmostTuple() {
         if (getTupleCount() == 0) {
             return null;
         }
 
         leftMostTuple.setFieldCount(cmp.getKeyFieldCount());
-        leftMostTuple.resetByTupleOffset(buf.array(), buf.getInt(LEFT_MOST_KEY_OFFSET));
+        leftMostTuple.resetByTupleOffset(buf.array(), columnPageZeroReader.getLeftMostKeyOffset());
         return leftMostTuple;
     }
 
@@ -58,17 +67,21 @@
         }
 
         rightMostTuple.setFieldCount(cmp.getKeyFieldCount());
-        rightMostTuple.resetByTupleOffset(buf.array(), buf.getInt(RIGHT_MOST_KEY_OFFSET));
+        rightMostTuple.resetByTupleOffset(buf.array(), columnPageZeroReader.getRightMostKeyOffset());
         return rightMostTuple;
     }
 
+    public void getAllColumns(IntOpenHashSet presentColumns) {
+        columnPageZeroReader.getAllColumns(presentColumns);
+    }
+
     public IColumnTupleIterator createTupleReference(int index, IColumnReadMultiPageOp multiPageOp) {
         return columnarTupleReader.createTupleIterator(this, index, multiPageOp);
     }
 
     @Override
     public int getTupleCount() {
-        return buf.getInt(Constants.TUPLE_COUNT_OFFSET);
+        return columnPageZeroReader.getTupleCount();
     }
 
     public int getPageId() {
@@ -76,26 +89,51 @@
     }
 
     public int getNumberOfColumns() {
-        return buf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+        return columnPageZeroReader.getNumberOfPresentColumns();
     }
 
     public int getColumnOffset(int columnIndex) {
-        if (columnIndex >= getNumberOfColumns()) {
+        // update the exception message.
+        if (!columnPageZeroReader.isValidColumn(columnIndex)) {
             throw new IndexOutOfBoundsException(columnIndex + " >= " + getNumberOfColumns());
         }
-        return columnarTupleReader.getColumnOffset(buf, columnIndex);
+        return columnPageZeroReader.getColumnOffset(columnIndex);
+    }
+
+    public boolean isValidColumn(int columnIndex) {
+        return columnPageZeroReader.isValidColumn(columnIndex);
     }
 
     public int getNextLeaf() {
-        return buf.getInt(NEXT_LEAF_OFFSET);
+        return columnPageZeroReader.getNextLeaf();
     }
 
     public int getMegaLeafNodeLengthInBytes() {
-        return buf.getInt(MEGA_LEAF_NODE_LENGTH);
+        return columnPageZeroReader.getMegaLeafNodeLengthInBytes();
+    }
+
+    // flag needs to be directly accessed from the buffer, as this will be used to choose the pageReader
+    public byte getFlagByte() {
+        return buf.get(FLAG_OFFSET);
+    }
+
+    public void skipFilters() {
+        columnPageZeroReader.skipFilters();
+    }
+
+    public void skipColumnOffsets() {
+        columnPageZeroReader.skipColumnOffsets();
+    }
+
+    public IColumnPageZeroReader getColumnPageZeroReader() {
+        return columnPageZeroReader;
     }
 
     public int getMegaLeafNodeNumberOfPages() {
-        return (int) Math.ceil((double) getMegaLeafNodeLengthInBytes() / buf.capacity());
+        // the denominator should ideally be the bufferCache pageSize, but
+        // in the current way, the pageZeroCapacity = bufferCache's pageSize.
+        // May be, needs to be changed in the future, to point to the bufferCache's pageSize.
+        return (int) Math.ceil((double) getMegaLeafNodeLengthInBytes() / columnPageZeroReader.getPageZeroCapacity());
     }
 
     public ColumnBTreeReadLeafFrame createCopy() {
@@ -106,4 +144,8 @@
     public ITreeIndexTupleReference createTupleReference() {
         throw new IllegalArgumentException("Use createTupleReference(int)");
     }
+
+    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index 1fc5712..f5b7893 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -30,7 +30,7 @@
 
     public ColumnBTreeWriteLeafFrame(ITreeIndexTupleWriter rowTupleWriter,
             AbstractColumnTupleWriter columnTupleWriter) {
-        super(rowTupleWriter);
+        super(rowTupleWriter, columnTupleWriter.getColumnPageZeroWriterFlavorSelector());
         this.columnTupleWriter = columnTupleWriter;
     }
 
@@ -49,11 +49,16 @@
 
     void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, ITupleReference minKey,
             ITupleReference maxKey) throws HyracksDataException {
+        IColumnPageZeroWriter pageZeroWriter = pageZeroWriterFlavorSelector.getPageZeroWriter();
+
+        // TODO(zero): Ideally, all these fields should be specific to the writer.
+        // However, some of the position constants are accessed directly elsewhere,
+        // so refactoring will require careful consideration to avoid breaking existing usage.
+
         // Prepare the space for writing the columns' information such as the primary keys
         buf.position(HEADER_SIZE);
         // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
-        buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(buf));
-
+        buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(buf, pageZeroWriter));
         // Write min and max keys
         int offset = buf.position();
         buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
@@ -62,10 +67,14 @@
         rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
 
         // Write page information
-        int numberOfColumns = columnWriter.getNumberOfColumns(false);
         buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
-        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns);
-        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, columnWriter.getColumnOffsetsSize(false));
+        buf.put(FLAG_OFFSET, pageZeroWriter.flagCode());
+        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, pageZeroWriter.getNumberOfColumns());
+        // correct the offset's, this all should be deferred to writer
+        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, pageZeroWriter.getColumnOffsetsSize());
+
+        // reset the collected meta info
+        columnWriter.reset();
     }
 
     public AbstractColumnTupleWriter getColumnTupleWriter() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
new file mode 100644
index 0000000..5a625fc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+
+import java.nio.ByteBuffer;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+
+public interface IColumnPageZeroReader {
+
+    void reset(ByteBuffer pageZeroBuf);
+
+    int getColumnOffset(int columnIndex);
+
+    int getColumnFilterOffset(int columnIndex);
+
+    long getLong(int offset);
+
+    void skipFilters();
+
+    void skipColumnOffsets();
+
+    int getTupleCount();
+
+    int getLeftMostKeyOffset();
+
+    int getRightMostKeyOffset();
+
+    int getNumberOfPresentColumns();
+
+    int getRelativeColumnIndex(int columnIndex);
+
+    int getNextLeaf();
+
+    int getMegaLeafNodeLengthInBytes();
+
+    int getPageZeroCapacity();
+
+    boolean isValidColumn(int columnIndex);
+
+    void getAllColumns(IntOpenHashSet presentColumns);
+
+    ByteBuffer getPageZeroBuf();
+
+    void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
new file mode 100644
index 0000000..041415a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Interface for writing column metadata to page zero of a column page.
+ * This abstraction supports different page zero layouts including:
+ * - Default layout: stores all columns with fixed offsets
+ * - Sparse layout: stores only present columns with variable offsets
+ * 
+ * The writer handles column offsets, filters (min/max values), and primary key data.
+ */
+public interface IColumnPageZeroWriter {
+
+    /** Flag code for default page zero writer */
+    byte DEFAULT_WRITER_FLAG = 0;
+
+    /** Flag code for sparse page zero writer */
+    byte SPARSE_WRITER_FLAG = 1;
+
+    /**
+     * Initializes the writer with page zero buffer and column information.
+     * 
+     * @param pageZeroBuf The page zero buffer to write to
+     * @param presentColumns Array of column indexes that are present in this page
+     * @param numberOfColumns Total number of columns in the schema (may be larger than presentColumns)
+     */
+    void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns);
+
+    /**
+     * Returns the flag code that identifies this writer type.
+     * Used for selecting the appropriate reader during deserialization.
+     * 
+     * @return flag code (DEFAULT_WRITER_FLAG for default, SPARSE_WRITER_FLAG for sparse)
+     */
+    byte flagCode();
+
+    /**
+     * Allocates space in page zero for column metadata.
+     * This includes space for column offsets, filters, and primary keys.
+     */
+    void allocateColumns();
+
+    /**
+     * Records the offset of a column's data within the page.
+     * 
+     * @param absoluteColumnIndex The absolute column index in the schema
+     * @param relativeColumnIndex The relative column index within this page (for sparse layouts)
+     * @param offset The byte offset where the column's data begins
+     */
+    void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset);
+
+    /**
+     * Stores filter information (min/max values) for a column.
+     * This enables efficient filtering during query execution.
+     * 
+     * @param relativeColumnIndex The relative column index within this page
+     * @param normalizedMinValue The normalized minimum value in the column
+     * @param normalizedMaxValue The normalized maximum value in the column
+     */
+    void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue);
+
+    /**
+     * Writes primary key column data to page zero.
+     * Primary keys are stored directly in page zero for fast access.
+     * 
+     * @param primaryKeyWriters Array of writers containing primary key data
+     * @throws HyracksDataException If an error occurs during writing
+     */
+    void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException;
+
+    /**
+     * Returns the number of columns handled by this writer.
+     * For default writers, this is the total schema size.
+     * For sparse writers, this is the number of present columns.
+     * 
+     * @return number of columns
+     */
+    int getNumberOfColumns();
+
+    /**
+     * Determines whether a column should be included in ordered processing.
+     * This is particularly important for sparse columns where not all columns may be present.
+     * 
+     * @param presentColumns Set of columns present in this page
+     * @param columnIndex The column index to check
+     * @param includeChildrenColumns Whether to include child columns for complex types
+     * @return true if the column should be included
+     */
+    boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns);
+
+    /**
+     * Returns the page zero buffer being written to.
+     * 
+     * @return the page zero buffer
+     */
+    ByteBuffer getPageZeroBuffer();
+
+    /**
+     * Maps an absolute column index to a relative index within this page.
+     * For default layouts, this is typically an identity mapping.
+     * For sparse layouts, this maps to the position within the present columns array.
+     * 
+     * @param columnIndex The absolute column index in the schema
+     * @return the relative column index within this page
+     */
+    int getRelativeColumnIndex(int columnIndex);
+
+    /**
+     * Returns the total size in bytes used for storing column offsets.
+     * 
+     * @return size in bytes of column offset storage
+     */
+    int getColumnOffsetsSize();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
new file mode 100644
index 0000000..fe2e24e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+
+/**
+ * Strategy interface for selecting the optimal page zero writer implementation.
+ * 
+ * This interface implements the Strategy pattern to choose between different
+ * page zero layouts based on space efficiency. The selector dynamically
+ * switches between default and sparse writers to minimize storage overhead.
+ * 
+ * The selection is made by comparing the space requirements of both approaches
+ * and choosing the more efficient one for each specific data pattern.
+ */
+public interface IColumnPageZeroWriterFlavorSelector {
+
+    /**
+     * Evaluates and switches the page zero writer based on space efficiency.
+     * 
+     * This method compares the space requirements of both writer implementations
+     * and selects the one that uses less space. The decision is made dynamically
+     * for each batch of data to optimize storage utilization.
+     * 
+     * @param spaceOccupiedByDefaultWriter Space in bytes required by the default writer
+     * @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
+     */
+    void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter);
+
+    /**
+     * Creates the appropriate page zero reader for the given writer type.
+     * 
+     * This method is used during deserialization to create a reader that matches
+     * the writer type used during serialization. The flag identifies which
+     * layout was used.
+     * 
+     * @param flag The flag code identifying the writer type (0=default, 1=sparse)
+     * @return the appropriate reader instance
+     */
+    IColumnPageZeroReader createPageZeroReader(byte flag);
+
+    /**
+     * Returns the currently selected page zero writer instance.
+     * 
+     * @return the writer instance selected by the most recent call to switchPageZeroWriterIfNeeded
+     */
+    IColumnPageZeroWriter getPageZeroWriter();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IValuesWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IValuesWriter.java
new file mode 100644
index 0000000..191952c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IValuesWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+
+import java.io.OutputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Base interface for writing column values to output streams.
+ * 
+ * This interface provides a common abstraction for different types of column writers,
+ * enabling the page zero writer implementations to handle both primary key columns
+ * and regular column values uniformly. This abstraction is particularly important
+ * for the sparse column architecture where different writer types need to be
+ * handled consistently.
+ * 
+ * Implementations include:
+ * - IColumnValuesWriter: For regular column data with additional metadata
+ * - Primary key writers: For key columns stored directly in page zero
+ */
+public interface IValuesWriter {
+    /**
+     * Flushes the column values to the specified output stream.
+     * 
+     * This method writes the accumulated column data to the provided output stream.
+     *
+     * @param out The output stream to write the column data to
+     * @throws HyracksDataException If an error occurs during the write operation
+     */
+    void flush(OutputStream out) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index f079f51..a1c60a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -39,7 +39,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
     private final int componentIndex;
-    private final ColumnBTreeReadLeafFrame frame;
+    protected final ColumnBTreeReadLeafFrame frame;
     private final IColumnBufferProvider[] primaryKeyBufferProviders;
     private final IColumnBufferProvider[] filterBufferProviders;
     private final IColumnBufferProvider[] buffersProviders;
@@ -100,7 +100,7 @@
     }
 
     @Override
-    public final void newPage() throws HyracksDataException {
+    public void newPage() throws HyracksDataException {
         tupleIndex = 0;
         ByteBuffer pageZero = frame.getBuffer();
         pageZero.clear();
@@ -120,10 +120,9 @@
     public final void reset(int startIndex, int endIndex) throws HyracksDataException {
         tupleIndex = startIndex;
         this.endIndex = endIndex;
-        ByteBuffer pageZero = frame.getBuffer();
         int numberOfTuples = frame.getTupleCount();
         //Start new page and check whether we should skip reading non-key columns or not
-        boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+        boolean readColumnPages = startNewPage(numberOfTuples);
         //Release previous pinned pages if any
         unpinColumnsPages();
         /*
@@ -196,8 +195,7 @@
 
     protected abstract int setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
 
-    protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples)
-            throws HyracksDataException;
+    protected abstract boolean startNewPage(int numberOfTuples) throws HyracksDataException;
 
     protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 22ef4f1..988d413 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -54,7 +54,7 @@
 
     @Override
     public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
-        if (columnIndex >= frame.getNumberOfColumns()) {
+        if (!frame.isValidColumn(columnIndex)) {
             numberOfRemainingPages = 0;
             length = 0;
             return;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 88dfaef..6eff94f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -226,7 +226,8 @@
 
     @Override
     public String toString() {
-        return "CachedPage:[page:" + BufferedFileHandle.getPageId(dpid) + ", compressedPageOffset:" + compressedOffset
+        return "CachedPage:[dpid: " + dpid + ", fileId: " + BufferedFileHandle.getFileId(dpid) + ", page:"
+                + BufferedFileHandle.getPageId(dpid) + ", compressedPageOffset:" + compressedOffset
                 + ", compressedSize:" + compressedSize + "]";
     }
 }