Merge "Merge branch 'gerrit/goldfish' into 'master'"
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 077b39b..d5c916f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -1120,7 +1120,8 @@
             }
             if (!function.isExternal()) {
                 // all non-external UDFs should've been inlined by now
-                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, signature);
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
+                        "UDF not inlined: " + signature);
             }
             IFunctionInfo finfo = ExternalFunctionCompilerUtil.getExternalFunctionInfo(metadataProvider, function);
             AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(finfo, args);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp
new file mode 100644
index 0000000..71a618b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE DATASET ColumnDataset
+PRIMARY KEY (id: int) WITH {
+    "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp
new file mode 100644
index 0000000..5adbda4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+USE test;
+
+-- This will add 5000 columns
+UPSERT INTO ColumnDataset (
+    SELECT VALUE object_add_fields({"id": x},
+                           [{"field-name": "myBadLongGeneratedFieldName" || to_string(x), "field-value":x}])
+    FROM RANGE(1, 5000) x
+)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp
new file mode 100644
index 0000000..c7fd8b5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- This will read a large schema (chunked into two pieces given that the page size is 32KB)
+SELECT VALUE COUNT(*)
+FROM ColumnDataset c
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp
new file mode 100644
index 0000000..cda8ed1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- Ensure values can be projected with large schemas
+SELECT VALUE c.myBadLongGeneratedFieldName751
+FROM ColumnDataset c
+WHERE c.myBadLongGeneratedFieldName751 IS NOT UNKNOWN
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.001.ddl.sqlpp
new file mode 100644
index 0000000..064d705
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.001.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE DATASET ColumnDataset
+PRIMARY KEY (id: int) WITH {
+    "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.002.update.sqlpp
new file mode 100644
index 0000000..205b6a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.002.update.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+UPSERT INTO ColumnDataset (
+    {"id":0, "a": [1]},
+    {"id":1, "a": [1]},
+    {"id":2, "a": [1]},
+    {"id":3, "a": [1]},
+    {"id":4, "a": [1]},
+    {"id":5, "a": [1]},
+    {"id":6, "a": [1]},
+    {"id":7, "a": [1]}
+);
+
+UPSERT INTO ColumnDataset (
+    {"id":8, "a": 5},
+    {"id":9, "a": 5},
+    {"id":10, "a": 5},
+    {"id":11, "a": 5},
+    {"id":12, "a": 5},
+    {"id":13, "a": 5},
+    {"id":14, "a": 5},
+    {"id":15, "a": 5}
+);
+
+UPSERT INTO ColumnDataset (
+    {"id":16, "a": null},
+    {"id":17, "a": null},
+    {"id":18, "a": null},
+    {"id":19, "a": null},
+    {"id":20, "a": null},
+    {"id":21, "a": null},
+    {"id":22, "a": null},
+    {"id":23, "a": null}
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.003.query.sqlpp
new file mode 100644
index 0000000..05d4c05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/007/006.003.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT p.a, p.a IS NULL AS null_check
+FROM ColumnDataset p
+ORDER BY p.id
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.001.ddl.sqlpp
new file mode 100644
index 0000000..064d705
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.001.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE DATASET ColumnDataset
+PRIMARY KEY (id: int) WITH {
+    "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.002.update.sqlpp
new file mode 100644
index 0000000..e1cbb81
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.002.update.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+UPSERT INTO ColumnDataset (
+    {"id":0, "a": [1]},
+    {"id":1, "a": [1]},
+    {"id":2, "a": [1]},
+    {"id":3, "a": [1]},
+    {"id":4, "a": [1]},
+    {"id":5, "a": [1]},
+    {"id":6, "a": [1]},
+    {"id":7, "a": [1]}
+);
+
+UPSERT INTO ColumnDataset (
+    {"id":8, "a": [5]},
+    {"id":9, "a": [5]},
+    {"id":10, "a": [5]},
+    {"id":11, "a": [5]},
+    {"id":12, "a": [5]},
+    {"id":13, "a": [5]},
+    {"id":14, "a": [5]},
+    {"id":15, "a": [5]}
+);
+
+UPSERT INTO ColumnDataset (
+    {"id":16, "a": null},
+    {"id":17, "a": null},
+    {"id":18, "a": null},
+    {"id":19, "a": null},
+    {"id":20, "a": null},
+    {"id":21, "a": null},
+    {"id":22, "a": null},
+    {"id":23, "a": null}
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.003.query.sqlpp
new file mode 100644
index 0000000..05d4c05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/missing-null-values/008/006.003.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT p.a, p.a IS NULL AS null_check
+FROM ColumnDataset p
+ORDER BY p.id
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.01.ddl.sqlpp
new file mode 100644
index 0000000..d930fc9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.01.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description  : Test inlining of UDFs
+ */
+
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create function fun1(...) {
+  args[0] + args[1]
+};
+
+create function fun2(...) {
+  args[0] - args[1]
+};
+
+CREATE TYPE openType AS {id: int};
+CREATE DATASET ds(openType) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.02.update.sqlpp
new file mode 100644
index 0000000..22cfcc1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.02.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+UPSERT INTO ds {"id": 1, "a": 1, "b": 2, "c": [10,20,30,40], "d": [100,200,300,400]};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.03.query.sqlpp
new file mode 100644
index 0000000..5ae85ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/inline-in-expr/inline-in-expr.03.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE experiments;
+
+SELECT -fun1(ds.a, ds.b) AS x1,
+ds.c[fun1(ds.a, ds.b)] AS x2,
+ds.d[-fun2(ds.a, ds.b)] AS x3
+FROM ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm
new file mode 100644
index 0000000..29988c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm
@@ -0,0 +1 @@
+751
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/007/007.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/007/007.003.adm
new file mode 100644
index 0000000..4d2d643
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/007/007.003.adm
@@ -0,0 +1,24 @@
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": false, "a": 5 }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/008/008.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/008/008.003.adm
new file mode 100644
index 0000000..a1c54ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/missing-null-values/008/008.003.adm
@@ -0,0 +1,24 @@
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 1 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": false, "a": [ 5 ] }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
+{ "null_check": true, "a": null }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/inline-in-expr/inline-in-expr.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/inline-in-expr/inline-in-expr.03.adm
new file mode 100644
index 0000000..145a6e3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/inline-in-expr/inline-in-expr.03.adm
@@ -0,0 +1 @@
+{ "x1": -3, "x2": 40, "x3": 200 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 4928073..3ad9438 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -13282,6 +13282,11 @@
   </test-group>
   <test-group name="user-defined-functions">
     <test-case FilePath="user-defined-functions">
+      <compilation-unit name="inline-in-expr">
+        <output-dir compare="Text">inline-in-expr</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="user-defined-functions">
       <compilation-unit name="bad-function-ddl-1">
         <output-dir compare="Text">bad-function-ddl-1</output-dir>
         <expected-error replacers="cloud:Default.experiments|def:experiments">Cannot find dataset TweetMessages in dataverse {0} nor an alias with name TweetMessages</expected-error>
@@ -16459,6 +16464,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="missing-null-values/007">
+        <output-dir compare="Text">missing-null-values/007</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="missing-null-values/008">
+        <output-dir compare="Text">missing-null-values/008</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="empty-array/001">
         <output-dir compare="Text">empty-array/001</output-dir>
       </compilation-unit>
@@ -16679,6 +16694,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="large-schema">
+        <output-dir compare="Text">large-schema</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="validation">
         <output-dir compare="Text">validation</output-dir>
         <expected-error>ASX1191: Merge policy 'correlated-prefix' is not supported with columnar storage format</expected-error>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
index cccac50..71b561a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -190,13 +190,18 @@
             currentParent = unionNode;
 
             ATypeTag childTypeTag = pointable.getTypeTag();
-            AbstractSchemaNode actualNode;
+
             if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) {
-                actualNode = unionNode.getOriginalType();
+                /*
+                 * NULL and MISSING are tracked since the start to be written in the originalType (i.e., the type
+                 * before injecting a union between the parent and the original node).
+                 */
+                AbstractSchemaNode actualNode = unionNode.getOriginalType();
+                acceptActualNode(pointable, actualNode);
             } else {
-                actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+                AbstractSchemaNode actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+                pointable.accept(this, actualNode);
             }
-            pointable.accept(this, actualNode);
 
             currentParent = previousParent;
             columnMetadata.exitNode(node);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index b03da57..0942a23 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -202,6 +202,15 @@
         }
     }
 
+    protected final void writeLevel(IColumnValuesWriter writer) throws HyracksDataException {
+        if (isNull()) {
+            // This will prepend the nullBitMask
+            writer.writeNull(level);
+        } else {
+            writer.writeLevel(level);
+        }
+    }
+
     protected void appendCommon(ObjectNode node) {
         node.put("typeTag", getTypeTag().toString());
         node.put("columnIndex", columnIndex);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
index f1c2929..7b02c70 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -100,7 +100,7 @@
             throw e;
         }
 
-        writer.writeLevel(level);
+        writeLevel(writer);
         if (primaryKey || isValue()) {
             try {
                 writer.writeValue(this);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
index 3f90a4b..0f3b817 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
@@ -99,15 +99,16 @@
 
         if (isRepeatedValue()) {
             while (!isLastDelimiter()) {
-                writer.writeLevel(level);
+                writeLevel(writer);
                 if (isValue()) {
                     writer.writeValue(this);
                 }
                 doNextAndCheck();
             }
         }
+
         //Add last delimiter, or NULL/MISSING
-        writer.writeLevel(level);
+        writeLevel(writer);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/test/resources/data/211-unionArrayPrimitiveNull.json b/asterixdb/asterix-column/src/test/resources/data/211-unionArrayPrimitiveNull.json
new file mode 100644
index 0000000..041afdc
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/resources/data/211-unionArrayPrimitiveNull.json
@@ -0,0 +1,3 @@
+{"a": [0]}
+{"a": 1}
+{"a": null}
diff --git a/asterixdb/asterix-column/src/test/resources/result/assembler/211-unionArrayPrimitiveNull.json b/asterixdb/asterix-column/src/test/resources/result/assembler/211-unionArrayPrimitiveNull.json
new file mode 100644
index 0000000..1b928a2
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/resources/result/assembler/211-unionArrayPrimitiveNull.json
@@ -0,0 +1,3 @@
+{"a":[0]}
+{"a":1}
+{"a":null}
diff --git a/asterixdb/asterix-column/src/test/resources/result/small/211-unionArrayPrimitiveNull.json b/asterixdb/asterix-column/src/test/resources/result/small/211-unionArrayPrimitiveNull.json
new file mode 100644
index 0000000..1b928a2
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/resources/result/small/211-unionArrayPrimitiveNull.json
@@ -0,0 +1,3 @@
+{"a":[0]}
+{"a":1}
+{"a":null}
diff --git a/asterixdb/asterix-column/src/test/resources/result/transformer/211-unionArrayPrimitiveNull.schema b/asterixdb/asterix-column/src/test/resources/result/transformer/211-unionArrayPrimitiveNull.schema
new file mode 100644
index 0000000..8fc4a85
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/resources/result/transformer/211-unionArrayPrimitiveNull.schema
@@ -0,0 +1,7 @@
+root
+|-- a: union <level: 1>
+|    |-- bigint: bigint <level: 1, index: 1>
+|    |    |-- Def size: 3 [(0,1),(1,1),(0,1)]
+|    |-- array: array <level: 1>
+|    |    |-- item: bigint <level: 2, index: 0>
+|    |    |    |-- Def size: 5 [(2,1),(1,1),(0,2),(4,1)]
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
index 3975425..629be84 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
@@ -153,7 +153,14 @@
     public Boolean visit(IndexAccessor fa, Void arg) throws CompilationException {
         Pair<Boolean, Expression> p = inlineUdfsAndViewsInExpr(fa.getExpr());
         fa.setExpr(p.second);
-        return p.first;
+        boolean inlined = p.first;
+        Expression indexExpr = fa.getIndexExpr();
+        if (indexExpr != null) {
+            Pair<Boolean, Expression> p2 = inlineUdfsAndViewsInExpr(indexExpr);
+            fa.setIndexExpr(p2.second);
+            inlined |= p2.first;
+        }
+        return inlined;
     }
 
     @Override
@@ -250,7 +257,9 @@
 
     @Override
     public Boolean visit(UnaryExpr u, Void arg) throws CompilationException {
-        return u.getExpr().accept(this, arg);
+        Pair<Boolean, Expression> p = inlineUdfsAndViewsInExpr(u.getExpr());
+        u.setExpr(p.second);
+        return p.first;
     }
 
     @Override
@@ -275,7 +284,7 @@
         if (returnExpression != null) {
             Pair<Boolean, Expression> rewrittenReturnExpr = inlineUdfsAndViewsInExpr(returnExpression);
             insert.setReturnExpression(rewrittenReturnExpr.second);
-            changed |= rewrittenReturnExpr.first;
+            changed = rewrittenReturnExpr.first;
         }
         Pair<Boolean, Expression> rewrittenBodyExpression = inlineUdfsAndViewsInExpr(insert.getBody());
         insert.setBody(rewrittenBodyExpression.second);
@@ -284,10 +293,10 @@
 
     @Override
     public Boolean visit(CopyToStatement stmtCopy, Void arg) throws CompilationException {
-        boolean changed = false;
+        boolean changed;
 
         Pair<Boolean, Expression> queryBody = inlineUdfsAndViewsInExpr(stmtCopy.getBody());
-        changed |= queryBody.first;
+        changed = queryBody.first;
         stmtCopy.setBody(queryBody.second);
 
         Pair<Boolean, List<Expression>> path = inlineUdfsInExprList(stmtCopy.getPathExpressions());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
index 3a7e901..7129897 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
@@ -24,41 +24,60 @@
 
 /**
  * An API for block compressor/decompressor.
- *
+ * <p>
  * Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe.
  */
 public interface ICompressorDecompressor {
     /**
      * Computes the required buffer size for <i>compress()</i>.
      *
-     * @param uBufferSize
-     *            The size of the uncompressed buffer.
+     * @param uBufferSize The size of the uncompressed buffer.
      * @return The required buffer size for compression
      */
     int computeCompressedBufferSize(int uBufferSize);
 
     /**
+     * Compress <i>src</i> into <i>dest</i>
+     *
+     * @param src        Uncompressed source buffer
+     * @param srcOffset  Source offset
+     * @param srcLen     Source length
+     * @param dest       Destination buffer
+     * @param destOffset Destination offset
+     * @return compressed length
+     */
+    int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException;
+
+    /**
      * Compress <i>uBuffer</i> into <i>cBuffer</i>
      *
-     * @param uBuffer
-     *            Uncompressed source buffer
-     * @param cBuffer
-     *            Compressed destination buffer
+     * @param uBuffer Uncompressed source buffer
+     * @param cBuffer Compressed destination buffer
      * @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size
-     * @throws HyracksDataException
      */
     ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException;
 
     /**
+     * Uncompress <i>src</i> into <i>dest</i>
+     *
+     * @param src        Compressed source
+     * @param srcOffset  Source offset
+     * @param srcLen     Source length
+     * @param dest       Destination buffer
+     * @param destOffset Destination offset
+     * @return uncompressed length
+     * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+     */
+    int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException;
+
+    /**
      * Uncompress <i>cBuffer</i> into <i>uBuffer</i>
      *
-     * @param cBuffer
-     *            Compressed source buffer
-     * @param uBuffer
-     *            Uncompressed destination buffer
+     * @param cBuffer Compressed source buffer
+     * @param uBuffer Uncompressed destination buffer
      * @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size
-     * @throws HyracksDataException
-     *             An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+     * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
      */
     ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException;
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
index 58c837b..7909ed1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
@@ -29,6 +29,7 @@
 public interface IMetadataPageManager extends IPageManager {
     /**
      * put the key value pair in the metadata page using the passed frame
+     *
      * @param frame
      * @param key
      * @param value
@@ -38,19 +39,22 @@
 
     /**
      * get the value of the key from the metadata page using the passed frame
+     *
      * @param frame
      * @param key
      * @param value
+     * @return true if the key exists, false otherwise
      * @throws HyracksDataException
      */
-    void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException;
+    boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException;
 
     /**
-     * @param frame
-     * @param key
-     * @return The byte offset in the index file for the entry with the passed key if the index is valid and the key
-     *         exists, returns -1 otherwise. use the passed frame to read the metadata page
-     * @throws HyracksDataException
+     * @return page size
      */
-    long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException;
+    int getPageSize();
+
+    /**
+     * @return free space of the current page
+     */
+    int getFreeSpace() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
index 7efc469..a898d59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
@@ -36,6 +36,7 @@
 
     /**
      * Set the page in the frame
+     *
      * @param page
      */
     void setPage(ICachedPage page);
@@ -53,6 +54,7 @@
 
     /**
      * Set the page level
+     *
      * @param level
      */
     void setLevel(byte level);
@@ -60,12 +62,14 @@
     /**
      * Get the next metadata page if this page is linked to other metadata pages
      * Return a negative value otherwise
+     *
      * @return
      */
     int getNextMetadataPage();
 
     /**
      * Link this metadata page to another one
+     *
      * @param nextPage
      */
     void setNextMetadataPage(int nextPage);
@@ -77,37 +81,44 @@
 
     /**
      * Set the max page of the file
+     *
      * @param maxPage
      */
     void setMaxPage(int maxPage);
 
     /**
      * Get a free page from the page
+     *
      * @return
      */
     int getFreePage();
 
     /**
      * Get the remaining space in the metadata page
+     *
      * @return
      */
     int getSpace();
 
     /**
      * add a new free page to the metadata page
+     *
      * @param freePage
      */
     void addFreePage(int freePage);
 
     /**
      * get the value with the key = key
+     *
      * @param key
      * @param value
+     * @return true if the key exists, false otherwise
      */
-    void get(IValueReference key, IPointable value);
+    boolean get(IValueReference key, IPointable value);
 
     /**
      * set the value with the key = key
+     *
      * @param key
      * @param value
      * @throws HyracksDataException
@@ -121,18 +132,21 @@
 
     /**
      * Sets the index to be valid in the metadata page
+     *
      * @param valid
      */
     void setValid(boolean valid);
 
     /**
      * Get the storage version associated with this index
+     *
      * @return
      */
     int getVersion();
 
     /**
      * Set the index root page id
+     *
      * @param rootPage
      */
     void setRootPageId(int rootPage);
@@ -149,6 +163,7 @@
 
     /**
      * return the offset to the entry of the passed key, -1, otherwise
+     *
      * @param key
      */
     int getOffset(IValueReference key);
@@ -162,4 +177,9 @@
      * @return true if the inspected page is a free page, false otherwise
      */
     boolean isFreePage();
+
+    /**
+     * @return the overhead (in bytes) to store a key-value pair
+     */
+    int getKeyValueStorageOverhead();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index d0757c8..62251d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -36,7 +36,6 @@
  * ....
  * ....
  * [free page 5][free page 4][free page 3][free page 2][free page 1]
- *
  */
 public class LIFOMetaDataFrame implements ITreeIndexMetadataFrame {
 
@@ -167,17 +166,22 @@
     }
 
     @Override
-    public void get(IValueReference key, IPointable value) {
+    public int getKeyValueStorageOverhead() {
+        return Integer.BYTES * 2;
+    }
+
+    @Override
+    public boolean get(IValueReference key, IPointable value) {
         int tupleCount = getTupleCount();
         int tupleStart = getTupleStart(0);
         for (int i = 0; i < tupleCount; i++) {
             if (isInner(key, tupleStart)) {
                 get(tupleStart + key.getLength() + Integer.BYTES, value);
-                return;
+                return true;
             }
             tupleStart = getNextTupleStart(tupleStart);
         }
-        value.set(null, 0, 0);
+        return false;
     }
 
     private int find(IValueReference key) {
@@ -197,7 +201,7 @@
         value.set(buf.array(), offset + Integer.BYTES, valueLength);
     }
 
-    private static final int compare(byte[] b1, int s1, byte[] b2, int s2, int l) {
+    private static int compare(byte[] b1, int s1, byte[] b2, int s2, int l) {
         for (int i = 0; i < l; i++) {
             if (b1[s1 + i] != b2[s2 + i]) {
                 return b1[s1 + i] - b2[s2 + i];
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 852c8b5..dae01bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hyracks.storage.am.common.freepage;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -40,108 +43,36 @@
 
 public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
     private final IBufferCache bufferCache;
+    private final ITreeIndexMetadataFrameFactory frameFactory;
+    private final List<ICachedPage> metadataPages;
     private int metadataPage = IBufferCache.INVALID_PAGEID;
     private int fileId = -1;
-    private final ITreeIndexMetadataFrameFactory frameFactory;
-    private ICachedPage confiscatedPage;
+    private ICachedPage currentPage;
+    private ICachedPage firstPage;
     private boolean ready = false;
 
     public AppendOnlyLinkedMetadataPageManager(IBufferCache bufferCache, ITreeIndexMetadataFrameFactory frameFactory) {
         this.bufferCache = bufferCache;
         this.frameFactory = frameFactory;
+        metadataPages = new ArrayList<>();
     }
 
     @Override
-    public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) throws HyracksDataException {
-        ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
-        metaPage.acquireWriteLatch();
-        try {
-            metaFrame.setPage(metaPage);
-            if (metaFrame.getSpace() > Integer.BYTES) {
-                metaFrame.addFreePage(freePageNum);
-            } else {
-                int newPageNum = metaFrame.getFreePage();
-                if (newPageNum < 0) {
-                    throw new HyracksDataException(
-                            "Inconsistent Meta Page State. It has no space, but it also has no entries.");
-                }
-                ICachedPage newNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, newPageNum));
-                newNode.acquireWriteLatch();
-                try {
-                    int metaMaxPage = metaFrame.getMaxPage();
-                    System.arraycopy(metaPage.getBuffer().array(), 0, newNode.getBuffer().array(), 0,
-                            metaPage.getBuffer().capacity());
-                    metaFrame.init();
-                    metaFrame.setNextMetadataPage(newPageNum);
-                    metaFrame.setMaxPage(metaMaxPage);
-                    metaFrame.addFreePage(freePageNum);
-                } finally {
-                    newNode.releaseWriteLatch(true);
-                    bufferCache.unpin(newNode);
-                }
-            }
-        } finally {
-            metaPage.releaseWriteLatch(true);
-            bufferCache.unpin(metaPage);
-        }
+    public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) {
+        throw new IllegalAccessError("On-disk pages must be immutable");
     }
 
     @Override
-    public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count)
-            throws HyracksDataException {
-        for (int i = 0; i < count; i++) {
-            releasePage(metaFrame, startingPage + i);
-        }
+    public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count) {
+        throw new IllegalAccessError("On-disk pages must be immutable");
     }
 
     @Override
     public int takePage(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
-        confiscatedPage.acquireWriteLatch();
-        int freePage = IBufferCache.INVALID_PAGEID;
-        try {
-            metaFrame.setPage(confiscatedPage);
-            freePage = metaFrame.getFreePage();
-            if (freePage < 0) { // no free page entry on this page
-                int nextPage = metaFrame.getNextMetadataPage();
-                if (nextPage > 0) { // sibling may have free pages
-                    ICachedPage nextNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextPage));
-                    nextNode.acquireWriteLatch();
-                    // we copy over the free space entries of nextpage into the
-                    // first meta page (metaDataPage)
-                    // we need to link the first page properly to the next page
-                    // of nextpage
-                    try {
-                        // remember entries that remain unchanged
-                        int maxPage = metaFrame.getMaxPage();
-                        // copy entire page (including sibling pointer, free
-                        // page entries, and all other info)
-                        // after this copy nextPage is considered a free page
-                        System.arraycopy(nextNode.getBuffer().array(), 0, confiscatedPage.getBuffer().array(), 0,
-                                nextNode.getBuffer().capacity());
-                        // reset unchanged entry
-                        metaFrame.setMaxPage(maxPage);
-                        freePage = metaFrame.getFreePage();
-                        // sibling also has no free pages, this "should" not
-                        // happen, but we deal with it anyway just to be safe
-                        if (freePage < 0) {
-                            freePage = nextPage;
-                        } else {
-                            metaFrame.addFreePage(nextPage);
-                        }
-                    } finally {
-                        nextNode.releaseWriteLatch(true);
-                        bufferCache.unpin(nextNode);
-                    }
-                } else {
-                    freePage = metaFrame.getMaxPage();
-                    freePage++;
-                    metaFrame.setMaxPage(freePage);
-                }
-            }
-        } finally {
-            confiscatedPage.releaseWriteLatch(false);
-        }
-        return freePage;
+        metaFrame.setPage(firstPage);
+        int maxPage = metaFrame.getMaxPage() + 1;
+        metaFrame.setMaxPage(maxPage);
+        return maxPage;
     }
 
     @Override
@@ -154,23 +85,22 @@
     @Override
     public int getMaxPageId(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
         ICachedPage metaNode;
-        if (confiscatedPage == null) {
+        if (firstPage == null) {
             int mdPage = getMetadataPageId();
             if (mdPage < 0) {
                 return IBufferCache.INVALID_PAGEID;
             }
             metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, mdPage));
         } else {
-            metaNode = confiscatedPage;
+            metaNode = firstPage;
         }
-        metaNode.acquireReadLatch();
+
         int maxPage = -1;
         try {
             metaFrame.setPage(metaNode);
             maxPage = metaFrame.getMaxPage();
         } finally {
-            metaNode.releaseReadLatch();
-            if (confiscatedPage == null) {
+            if (firstPage == null) {
                 bufferCache.unpin(metaNode);
             }
         }
@@ -195,48 +125,31 @@
         int pages = bufferCache.getNumPagesOfFile(fileId);
         //if there are no pages in the file yet, we're just initializing
         if (pages == 0) {
-            if (confiscatedPage != null) {
+            if (firstPage != null) {
                 throw new HyracksDataException("Metadata Page Manager is already initialized");
             }
             ITreeIndexMetadataFrame metaFrame = createMetadataFrame();
-            ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
-            try {
-                metaFrame.setPage(metaNode);
-                metaFrame.init();
-                metaFrame.setMaxPage(-1);
-            } finally {
-                confiscatedPage = metaNode;
-            }
+            // First to confiscate
+            confiscateNext(metaFrame);
+            firstPage = currentPage;
+            metaFrame.setMaxPage(-1);
         }
     }
 
     @Override
     public void close(IPageWriteFailureCallback failureCallback) throws HyracksDataException {
         if (ready) {
-            IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback,
-                    DefaultBufferCacheWriteContext.INSTANCE);
-            ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
-            confiscatedPage.acquireWriteLatch();
-            try {
-                metaFrame.setPage(confiscatedPage);
-                metaFrame.setValid(true);
-            } finally {
-                confiscatedPage.releaseWriteLatch(false);
-            }
-            int finalMetaPage = getMaxPageId(metaFrame) + 1;
-            confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
-            final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
-            compressedPageWriter.prepareWrite(confiscatedPage);
-            // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
-            // won't be flushed to disk because it won't be dirty until the write latch has been released.
-            pageWriter.write(confiscatedPage);
-            compressedPageWriter.endWriting();
+            persist(failureCallback);
             metadataPage = getMetadataPageId();
             ready = false;
-        } else if (confiscatedPage != null) {
-            bufferCache.returnPage(confiscatedPage, false);
+        } else if (!metadataPages.isEmpty()) {
+            for (ICachedPage page : metadataPages) {
+                bufferCache.returnPage(page, false);
+            }
         }
-        confiscatedPage = null;
+        currentPage = null;
+        firstPage = null;
+        metadataPages.clear();
     }
 
     /**
@@ -270,32 +183,26 @@
     @Override
     public void setRootPageId(int rootPage) throws HyracksDataException {
         ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
-        confiscatedPage.acquireWriteLatch();
-        try {
-            metaFrame.setPage(confiscatedPage);
-            metaFrame.setRootPageId(rootPage);
-        } finally {
-            confiscatedPage.releaseWriteLatch(false);
-        }
+        metaFrame.setPage(firstPage);
+        metaFrame.setRootPageId(rootPage);
         ready = true;
     }
 
     @Override
     public int getRootPageId() throws HyracksDataException {
         ICachedPage metaNode;
-        if (confiscatedPage == null) {
+        if (firstPage == null) {
             metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
         } else {
-            metaNode = confiscatedPage;
+            metaNode = firstPage;
         }
         ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
-        metaNode.acquireReadLatch();
+
         try {
             metaFrame.setPage(metaNode);
             return metaFrame.getRootPageId();
         } finally {
-            metaNode.releaseReadLatch();
-            if (confiscatedPage == null) {
+            if (firstPage == null) {
                 bufferCache.unpin(metaNode);
             }
         }
@@ -309,58 +216,122 @@
     @Override
     public void put(ITreeIndexMetadataFrame frame, IValueReference key, IValueReference value)
             throws HyracksDataException {
-        if (confiscatedPage == null) {
+        if (currentPage == null) {
             throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT);
         }
-        confiscatedPage.acquireWriteLatch();
-        try {
-            frame.setPage(confiscatedPage);
-            frame.put(key, value);
-        } finally {
-            confiscatedPage.releaseWriteLatch(false);
+
+        frame.setPage(currentPage);
+
+        if (frame.getSpace() < key.getLength() + value.getLength() + frame.getKeyValueStorageOverhead()) {
+            // If there's no space, confiscate an extra page
+            confiscateNext(frame);
+        }
+
+        frame.put(key, value);
+        if (frame.getSpace() == 0) {
+            /*
+             * Most likely a user is writing chunks, confiscate a new page so the next call to
+             * getFreeSpace() will not return 0.
+             */
+            confiscateNext(frame);
         }
     }
 
-    private ICachedPage pinPage() throws HyracksDataException {
-        return confiscatedPage == null ? bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()))
-                : confiscatedPage;
+    @Override
+    public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value)
+            throws HyracksDataException {
+        int nextPage = getNextPageId(frame, -1);
+        while (nextPage != -1) {
+            ICachedPage page = pinPage(nextPage);
+            try {
+                frame.setPage(page);
+                if (frame.get(key, value)) {
+                    return true;
+                }
+                nextPage = getNextPageId(frame, nextPage);
+            } finally {
+                unpinPage(page);
+            }
+        }
+
+        // To preserve the old behavior
+        value.set(null, 0, 0);
+        return false;
+    }
+
+    @Override
+    public int getPageSize() {
+        return bufferCache.getPageSize();
+    }
+
+    @Override
+    public int getFreeSpace() throws HyracksDataException {
+        if (currentPage == null) {
+            throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT);
+        }
+        ITreeIndexMetadataFrame frame = createMetadataFrame();
+        frame.setPage(currentPage);
+        return frame.getSpace() - frame.getKeyValueStorageOverhead();
+    }
+
+    private int getNextPageId(ITreeIndexMetadataFrame frame, int previousPageIdx) throws HyracksDataException {
+        if (metadataPages.isEmpty()) {
+            // Read-only (immutable)
+            return previousPageIdx == -1 ? getMetadataPageId() : frame.getNextMetadataPage();
+        }
+
+        // Write (still mutable)
+        int nextPageIdx = previousPageIdx + 1;
+        return nextPageIdx < metadataPages.size() ? nextPageIdx : -1;
+    }
+
+    private ICachedPage pinPage(int pageId) throws HyracksDataException {
+        if (!metadataPages.isEmpty()) {
+            return metadataPages.get(pageId);
+        }
+
+        return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId));
     }
 
     private void unpinPage(ICachedPage page) throws HyracksDataException {
-        if (confiscatedPage == null) {
+        if (metadataPages.isEmpty()) {
             bufferCache.unpin(page);
         }
     }
 
-    @Override
-    public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException {
-        ICachedPage page = pinPage();
-        page.acquireReadLatch();
+    private void confiscateNext(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
+        ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
         try {
-            frame.setPage(page);
-            frame.get(key, value);
+            metaFrame.setPage(metaNode);
+            metaFrame.init();
         } finally {
-            page.releaseReadLatch();
-            unpinPage(page);
+            metadataPages.add(metaNode);
+            currentPage = metaNode;
         }
     }
 
-    @Override
-    public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException {
-        int pageId = getMetadataPageId();
-        if (pageId != IBufferCache.INVALID_PAGEID) {
-            ICachedPage page = pinPage();
-            page.acquireReadLatch();
-            try {
-                frame.setPage(page);
-                int inPageOffset = frame.getOffset(key);
-                return inPageOffset >= 0 ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
-                        + IBufferCache.RESERVED_HEADER_BYTES : -1L;
-            } finally {
-                page.releaseReadLatch();
-                unpinPage(page);
-            }
+    private void persist(IPageWriteFailureCallback failureCallback) throws HyracksDataException {
+        IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback,
+                DefaultBufferCacheWriteContext.INSTANCE);
+        ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
+        // Last page will have nextPage as -1
+        int nextPage = -1;
+        int pageId = getMaxPageId(metaFrame) + 1;
+        final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
+
+        // Write pages in reverse order (first confiscated page will be the last one to be written)
+        for (int i = metadataPages.size() - 1; i >= 0; i--) {
+            ICachedPage page = metadataPages.get(i);
+            metaFrame.setPage(page);
+            metaFrame.setNextMetadataPage(nextPage);
+            // The validity bit matters in the last written page only. No harm for setting this flag for all pages.
+            metaFrame.setValid(true);
+
+            page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, pageId));
+            compressedPageWriter.prepareWrite(page);
+            pageWriter.write(page);
+            nextPage = pageId++;
         }
-        return -1L;
+        compressedPageWriter.endWriting();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index edc6d10..fbb6b5f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -319,24 +319,18 @@
     }
 
     @Override
-    public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException {
+    public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value)
+            throws HyracksDataException {
         throw new HyracksDataException("Unsupported Operation");
     }
 
     @Override
-    public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException {
-        int metadataPageNum = getMetadataPageId();
-        if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
-            ICachedPage metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
-            metaNode.acquireReadLatch();
-            try {
-                frame.setPage(metaNode);
-                return ((long) metadataPageNum * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key);
-            } finally {
-                metaNode.releaseReadLatch();
-                bufferCache.unpin(metaNode);
-            }
-        }
-        return -1;
+    public int getPageSize() {
+        return bufferCache.getPageSize();
+    }
+
+    @Override
+    public int getFreeSpace() throws HyracksDataException {
+        throw new HyracksDataException("Unsupported Operation");
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index d12fbe9..d95678e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -87,20 +87,17 @@
     @Override
     public synchronized void activate() throws HyracksDataException {
         super.activate();
-        postLoadingDiskComponents();
     }
 
     @Override
-    protected void postLoadingDiskComponents() throws HyracksDataException {
+    protected void completeActivation() throws HyracksDataException {
         if (diskComponents.isEmpty()) {
             columnMetadata = columnManager.activate();
         } else {
             IComponentMetadata componentMetadata = diskComponents.get(0).getMetadata();
             columnMetadata = columnManager.activate(ColumnUtil.getColumnMetadataCopy(componentMetadata));
         }
-
         diskCacheManager.activate(columnMetadata.getNumberOfColumns(), diskComponents, diskBufferCache);
-        super.postLoadingDiskComponents();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java
new file mode 100644
index 0000000..39a6dd2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.utils;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
+import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressor;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * A Reader/Writer for {@link IColumnMetadata}
+ */
+@ThreadSafe
+final class ColumnMetadataReaderWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
+    /**
+     * The header consists of two integers: [originalLength | compressedLength]
+     */
+    private static final int CHUNK_HEADER_SIZE = Integer.BYTES * 2;
+    /**
+     * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)}
+     *
+     * @see LSMColumnBTree#activate()
+     * @see IColumnManager#activate(IValueReference)
+     */
+    private static final MutableArrayValueReference COLUMNS_METADATA_KEY =
+            new MutableArrayValueReference("COLUMNS_METADATA".getBytes());
+
+    /**
+     * The default (and only) compressor today is 'snappy'. In the future, this could be changed.
+     * Old indexes should still use snappy. But new indexes can take whatever {@link ICompressorDecompressor} passed
+     * to it.
+     */
+    private final ICompressorDecompressor compressorDecompressor;
+
+    /**
+     * This is currently {@link ThreadSafe} since {@link SnappyCompressorDecompressor#INSTANCE} is thread safe. If the
+     * {@link ICompressorDecompressor} is modified or changed, the modifier should ensure that either the new
+     * {@link ICompressorDecompressor} is thread safe or the users of this class should create their own instances.
+     */
+    public ColumnMetadataReaderWriter() {
+        compressorDecompressor = SnappyCompressorDecompressor.INSTANCE;
+    }
+
+    /**
+     * Writes the metadata. If the metadata is 'large', then it will be compressed and stored in chunks
+     *
+     * @param metadata          to write
+     * @param componentMetadata to store the metadata at
+     */
+    public void writeMetadata(IValueReference metadata, IComponentMetadata componentMetadata)
+            throws HyracksDataException {
+        int requiredLength = COLUMNS_METADATA_KEY.getLength() + metadata.getLength();
+        if (componentMetadata.getAvailableSpace() >= requiredLength) {
+            componentMetadata.put(COLUMNS_METADATA_KEY, metadata);
+        } else {
+            LOGGER.debug("Writing large column metadata of size {} bytes", requiredLength);
+            writeChunks(metadata, componentMetadata);
+        }
+    }
+
+    /**
+     * Read the metadata. If the metadata is chunked, it will be assembled back to its original form
+     *
+     * @param componentMetadata source
+     * @return read metadata
+     */
+    public IValueReference readMetadata(IComponentMetadata componentMetadata) throws HyracksDataException {
+        ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+        storage.reset();
+
+        if (!componentMetadata.get(COLUMNS_METADATA_KEY, storage)) {
+            readChunks(componentMetadata, storage);
+        }
+
+        return storage;
+    }
+
+    private void writeChunks(IValueReference metadata, IComponentMetadata componentMetadata)
+            throws HyracksDataException {
+        ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+        int originalLength = metadata.getLength();
+
+        int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength);
+        ArrayBackedValueStorage compressed = new ArrayBackedValueStorage(requiredSize + CHUNK_HEADER_SIZE);
+
+        // Write the compressed content after CHUNK_HEADER_SIZE
+        int compressedLength = compressorDecompressor.compress(metadata.getByteArray(), 0, originalLength,
+                compressed.getByteArray(), CHUNK_HEADER_SIZE);
+        // Set the size to be the header size + compressedLength
+        compressed.setSize(CHUNK_HEADER_SIZE + compressedLength);
+        // Serialize the original length
+        IntegerPointable.setInteger(compressed.getByteArray(), 0, originalLength);
+        // Serialize the compressed length
+        IntegerPointable.setInteger(compressed.getByteArray(), Integer.BYTES, compressedLength);
+
+        // Write chunks
+        VoidPointable chunk = new VoidPointable();
+        int position = 0;
+        int chunkId = 0;
+        int keyLength = COLUMNS_METADATA_KEY.getLength() + Integer.BYTES;
+        int totalLength = compressed.getLength();
+        while (position < totalLength) {
+            int remaining = totalLength - position;
+            int freeSpace = componentMetadata.getAvailableSpace() - keyLength;
+            // Find the largest chunk size that can be written
+            int chunkLength = Math.min(remaining, freeSpace);
+            // Prepare a chunk
+            chunk.set(compressed.getByteArray(), position, chunkLength);
+            // Write a chunk
+            componentMetadata.put(getChunkKey(chunkId++, key), chunk);
+            position += chunkLength;
+        }
+    }
+
+    private void readChunks(IComponentMetadata componentMetadata, ArrayBackedValueStorage chunk)
+            throws HyracksDataException {
+        ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+        ArrayBackedValueStorage compressed = new ArrayBackedValueStorage();
+        // Ensure large buffer to avoid enlarging the storage multiple times
+        chunk.setSize(componentMetadata.getPageSize());
+
+        int chunkId = 0;
+        // Read the header + the first chunk
+        chunk.reset();
+        componentMetadata.get(getChunkKey(chunkId++, key), chunk);
+        int originalLength = IntegerPointable.getInteger(chunk.getByteArray(), 0);
+        int compressedLength = IntegerPointable.getInteger(chunk.getByteArray(), Integer.BYTES);
+        // Append the first chunk without the header
+        compressed.append(chunk.getByteArray(), CHUNK_HEADER_SIZE, chunk.getLength() - CHUNK_HEADER_SIZE);
+        // Read the remaining chunks
+        int remainingLength = compressedLength - compressed.getLength();
+        while (remainingLength > 0) {
+            chunk.reset();
+            // Get the next chunk
+            componentMetadata.get(getChunkKey(chunkId++, key), chunk);
+            // Append the next chunk
+            compressed.append(chunk);
+            remainingLength -= chunk.getLength();
+        }
+
+        // Decompress 'compressed'
+        int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength);
+        // Ensure the size
+        chunk.setSize(requiredSize);
+        int uncompressedLength = compressorDecompressor.uncompress(compressed.getByteArray(), 0, compressedLength,
+                chunk.getByteArray(), 0);
+        if (uncompressedLength != originalLength) {
+            throw new IllegalStateException("Uncompressed size mismatch (original: " + originalLength
+                    + ", uncompressed: " + uncompressedLength + ")");
+        }
+
+        // Set the original length
+        chunk.setSize(originalLength);
+    }
+
+    private static IValueReference getChunkKey(int chunkId, ArrayBackedValueStorage storage)
+            throws HyracksDataException {
+        if (chunkId == 0) {
+            // First chunk. Append the key prefix + set the size
+            storage.reset();
+            storage.append(COLUMNS_METADATA_KEY);
+            storage.setSize(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+        }
+
+        IntegerPointable.setInteger(storage.getByteArray(), COLUMNS_METADATA_KEY.getLength(), chunkId);
+        return storage;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
index fc1e460..95edced 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
@@ -23,36 +23,24 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
-import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 
 public class ColumnUtil {
-    /**
-     * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)}
-     *
-     * @see LSMColumnBTree#activate()
-     * @see IColumnManager#activate(IValueReference)
-     */
-    private static final MutableArrayValueReference COLUMNS_METADATA_KEY =
-            new MutableArrayValueReference("COLUMNS_METADATA".getBytes());
+    // Currently, ColumnMetadataReaderWriter is thread safe as the snappy compressor/decompressor is thread safe
+    private static final ColumnMetadataReaderWriter READER_WRITER = new ColumnMetadataReaderWriter();
 
     private ColumnUtil() {
     }
 
     public static IValueReference getColumnMetadataCopy(IComponentMetadata src) throws HyracksDataException {
-        ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
-        src.get(COLUMNS_METADATA_KEY, storage);
-        return storage;
+        return READER_WRITER.readMetadata(src);
     }
 
     public static void putColumnsMetadataValue(IValueReference columnsMetadataValue, IComponentMetadata dest)
             throws HyracksDataException {
-        dest.put(COLUMNS_METADATA_KEY, columnsMetadataValue);
+        READER_WRITER.writeMetadata(columnsMetadataValue, dest);
     }
 
     public static int getColumnPageIndex(int columnOffset, int pageSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
index fa69d7a..8cdc064 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
@@ -23,23 +23,31 @@
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public interface IComponentMetadata {
+    /**
+     * @return page size
+     */
+    int getPageSize();
+
+    /**
+     * @return the available space to store a value
+     */
+    int getAvailableSpace() throws HyracksDataException;
 
     /**
      * Put the key value pair in this metadata, overwrite if it exists
      *
      * @param key
      * @param value
-     * @throws HyracksDataException
-     *             if the component is immutable
+     * @throws HyracksDataException if the component is immutable
      */
     void put(IValueReference key, IValueReference value) throws HyracksDataException;
 
     /**
      * Get the value of the key from the metadata, 0 length value if not exists
      *
-     * @param key
-     * @param value
-     * @throws HyracksDataException
+     * @param key     of the value
+     * @param storage storage used to store the retrieved value
+     * @return true if the key exists, false otherwise
      */
-    void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException;
+    boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e9a3abe..4f7c624 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -200,11 +200,11 @@
             throw HyracksDataException.create(ErrorCode.CANNOT_ACTIVATE_ACTIVE_INDEX);
         }
         loadDiskComponents();
-        postLoadingDiskComponents();
+        completeActivation();
+        isActive = true;
     }
 
-    protected void postLoadingDiskComponents() throws HyracksDataException {
-        isActive = true;
+    protected void completeActivation() throws HyracksDataException {
     }
 
     private void loadDiskComponents() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index 649989c..bed72a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -33,13 +33,23 @@
     }
 
     @Override
+    public int getPageSize() {
+        return mdpManager.getPageSize();
+    }
+
+    @Override
+    public int getAvailableSpace() throws HyracksDataException {
+        return mdpManager.getFreeSpace();
+    }
+
+    @Override
     public void put(IValueReference key, IValueReference value) throws HyracksDataException {
         mdpManager.put(mdpManager.createMetadataFrame(), key, value);
     }
 
     @Override
-    public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
-        mdpManager.get(mdpManager.createMetadataFrame(), key, value);
+    public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
+        return mdpManager.get(mdpManager.createMetadataFrame(), key, storage);
     }
 
     public void put(MemoryComponentMetadata metadata) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
index d0fe8a9..5314658 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
@@ -35,7 +35,7 @@
     }
 
     @Override
-    public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
+    public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
         throw new IllegalStateException("Attempt to read metadata of empty component");
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index 0c2167f..b90b37c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -38,6 +38,16 @@
     private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store =
             new ArrayList<>();
 
+    @Override
+    public int getPageSize() {
+        return -1;
+    }
+
+    @Override
+    public int getAvailableSpace() throws HyracksDataException {
+        return Integer.MAX_VALUE;
+    }
+
     /**
      * Note: for memory metadata, it is expected that the key will be constant
      *
@@ -64,14 +74,18 @@
      * @throws HyracksDataException
      */
     @Override
-    public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
+    public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
         lock.readLock().lock();
         try {
-            value.reset();
+            storage.reset();
             ArrayBackedValueStorage stored = get(key);
             if (stored != null) {
-                value.append(stored);
+                storage.append(stored);
+                return true;
             }
+
+            // Key does not exist
+            return false;
         } finally {
             lock.readLock().unlock();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
index c4855bd..943b2b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
@@ -21,7 +21,6 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class NoOpCompressorDecompressor implements ICompressorDecompressor {
     public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor();
@@ -35,12 +34,24 @@
     }
 
     @Override
-    public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+    public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) {
+        System.arraycopy(src, srcOffset, dest, destOffset, srcLen);
+        return srcLen;
+    }
+
+    @Override
+    public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) {
         return uBuffer;
     }
 
     @Override
-    public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+    public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) {
+        System.arraycopy(src, srcOffset, dest, destOffset, srcLen);
+        return srcLen;
+    }
+
+    @Override
+    public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) {
         return cBuffer;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
index 16c9a2d..e3274ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
@@ -29,10 +29,9 @@
  * Built-in Snappy compressor/decompressor wrapper
  */
 public class SnappyCompressorDecompressor implements ICompressorDecompressor {
-    protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
+    public static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
 
     private SnappyCompressorDecompressor() {
-
     }
 
     @Override
@@ -41,6 +40,16 @@
     }
 
     @Override
+    public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset)
+            throws HyracksDataException {
+        try {
+            return Snappy.compress(src, srcOffset, srcLen, dest, destOffset);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
     public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
         try {
             final int cLength = Snappy.compress(uBuffer.array(), uBuffer.position(), uBuffer.remaining(),
@@ -53,6 +62,16 @@
     }
 
     @Override
+    public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset)
+            throws HyracksDataException {
+        try {
+            return Snappy.uncompress(src, srcOffset, srcLen, dest, destOffset);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
     public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
         try {
             final int uLength = Snappy.uncompress(cBuffer.array(), cBuffer.position(), cBuffer.remaining(),